From ca47effe82781544aabd0f419121faf32502ecd7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 10 Mar 2026 16:53:41 +0900 Subject: [PATCH 1/3] in_forward: Process limit of handling incoming payloads Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw_conn.c | 18 ++++---- plugins/in_forward/fw_conn.h | 4 +- plugins/in_forward/fw_prot.c | 89 +++++++++++++++++++++++++++++++++++- 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/plugins/in_forward/fw_conn.c b/plugins/in_forward/fw_conn.c index 14258b09ccb..e0af3f0ad38 100644 --- a/plugins/in_forward/fw_conn.c +++ b/plugins/in_forward/fw_conn.c @@ -31,9 +31,10 @@ static int fw_conn_event_internal(struct flb_connection *connection) { int ret; - int bytes; - int available; - int size; + ssize_t bytes; + size_t read_bytes; + size_t available; + size_t size; char *tmp; struct fw_conn *conn; struct mk_event *event; @@ -66,7 +67,7 @@ static int fw_conn_event_internal(struct flb_connection *connection) available = (conn->buf_size - conn->buf_len); if (available < 1) { if (conn->buf_size >= ctx->buffer_max_size) { - flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%lu bytes)", + flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%zu bytes)", event->fd, (ctx->buffer_max_size)); fw_conn_del(conn); return -1; @@ -84,7 +85,7 @@ static int fw_conn_event_internal(struct flb_connection *connection) flb_errno(); return -1; } - flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i", + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %zu -> %zu", event->fd, conn->buf_size, size); conn->buf = tmp; @@ -97,9 +98,10 @@ static int fw_conn_event_internal(struct flb_connection *connection) available); if (bytes > 0) { - flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i", - bytes, conn->buf_len, conn->buf_len + bytes); - conn->buf_len += bytes; + read_bytes = (size_t) bytes; + flb_plg_trace(ctx->ins, "read()=%zd pre_len=%zu now_len=%zu", + bytes, conn->buf_len, conn->buf_len + read_bytes); + conn->buf_len += read_bytes; ret = fw_prot_process(ctx->ins, conn); if (ret == -1) { diff --git a/plugins/in_forward/fw_conn.h b/plugins/in_forward/fw_conn.h index af622de13b7..e98ebf43b0e 100644 --- a/plugins/in_forward/fw_conn.h +++ b/plugins/in_forward/fw_conn.h @@ -46,8 +46,8 @@ struct fw_conn { /* Buffer */ char *buf; /* Buffer data */ - int buf_len; /* Data length */ - int buf_size; /* Buffer size */ + size_t buf_len; /* Data length */ + size_t buf_size; /* Buffer size */ size_t rest; /* Unpacking offset */ /* Decompression context */ diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index 9ad1fe4ba7a..844c8494e66 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -1068,8 +1068,19 @@ static int fw_process_message_mode_entry( static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) { size_t off; size_t actual_size; + size_t buf_len; + + if (conn->rest > conn->buf_len) { + return 0; + } + + buf_len = conn->buf_len; + off = buf_len - conn->rest; + + if (off > buf_len) { + return 0; + } - off = conn->buf_len - conn->rest; actual_size = try_size; if (actual_size > conn->rest) { @@ -1341,6 +1352,11 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) * * https://github.com/msgpack/msgpack-c/issues/514 */ + if (bytes > 0 && all_used > SIZE_MAX - bytes) { + flb_plg_error(ctx->ins, "incoming frame size accounting overflow"); + goto cleanup_msgpack; + } + all_used += bytes; @@ -1520,6 +1536,13 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } if (data) { + if (len > ctx->buffer_max_size) { + flb_plg_error(ctx->ins, + "packedforward payload too large (%zu bytes), limit=%zu", + len, ctx->buffer_max_size); + goto cleanup_msgpack; + } + /* Get event type early for use in both compressed/uncompressed paths */ event_type = FLB_EVENT_TYPE_LOGS; if (contain_options) { @@ -1565,16 +1588,35 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } if (conn->compression_type != FLB_COMPRESSION_ALGORITHM_NONE) { + char *decoded_payload = NULL; char *decomp_buf = NULL; uint8_t *append_ptr; size_t available_space; size_t decomp_len; + size_t total_decompressed; int decomp_ret; size_t required_size; + total_decompressed = 0; + available_space = flb_decompression_context_get_available_space(conn->d_ctx); if (len > available_space) { + if (conn->d_ctx->input_buffer_length > SIZE_MAX - len) { + flb_plg_error(ctx->ins, + "decompression input size overflow"); + + goto cleanup_decompress; + } + required_size = conn->d_ctx->input_buffer_length + len; + if (required_size > ctx->buffer_max_size) { + flb_plg_error(ctx->ins, + "compressed payload exceeds limit (%zu bytes)", + ctx->buffer_max_size); + + goto cleanup_decompress; + } + if (flb_decompression_context_resize_buffer(conn->d_ctx, required_size) != 0) { flb_plg_error(ctx->ins, "cannot resize decompression buffer"); @@ -1592,6 +1634,14 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) goto cleanup_decompress; } + decoded_payload = flb_malloc(ctx->buffer_max_size); + if (!decoded_payload) { + flb_errno(); + flb_free(decomp_buf); + + goto cleanup_decompress; + } + do { decomp_len = ctx->buffer_chunk_size; decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len); @@ -1599,6 +1649,7 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) { if (decomp_len > 0) { flb_plg_error(ctx->ins, "decompression failed, data may be corrupt"); + flb_free(decoded_payload); flb_free(decomp_buf); goto cleanup_decompress; @@ -1607,16 +1658,50 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } if (decomp_len > 0) { - if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) { + if (total_decompressed > SIZE_MAX - decomp_len) { + flb_plg_error(ctx->ins, + "decompressed output size overflow"); + flb_free(decoded_payload); flb_free(decomp_buf); goto cleanup_decompress; } + + total_decompressed += decomp_len; + + if (total_decompressed > ctx->buffer_max_size) { + flb_plg_error(ctx->ins, + "decompressed payload exceeds limit (%zu bytes)", + ctx->buffer_max_size); + flb_free(decoded_payload); + flb_free(decomp_buf); + + goto cleanup_decompress; + } + + memcpy(decoded_payload + (total_decompressed - decomp_len), + decomp_buf, + decomp_len); } } while (decomp_len > 0); flb_free(decomp_buf); + if (total_decompressed > 0) { + if (append_log(ins, + conn, + event_type, + out_tag, + decoded_payload, + total_decompressed) == -1) { + flb_free(decoded_payload); + + goto cleanup_decompress; + } + } + + flb_free(decoded_payload); + flb_decompression_context_destroy(conn->d_ctx); conn->d_ctx = NULL; conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE; From c3a122f3ec04ca534eb0c0d688437f3951bfbd0e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 10 Mar 2026 17:10:28 +0900 Subject: [PATCH 2/3] tests: in_forward: Add test cases for exceeding limits Signed-off-by: Hiroshi Hatake --- tests/runtime/in_forward.c | 268 ++++++++++++++++++++++++++++++++++++- 1 file changed, 261 insertions(+), 7 deletions(-) diff --git a/tests/runtime/in_forward.c b/tests/runtime/in_forward.c index e947e45d849..4edaf3f0411 100644 --- a/tests/runtime/in_forward.c +++ b/tests/runtime/in_forward.c @@ -67,6 +67,14 @@ static void clear_output_num() set_output_num(0); } +static int cb_count_only(void *record, size_t size, void *data) +{ + int n = get_output_num(); + set_output_num(n + 1); + flb_free(record); + return 0; +} + static int create_simple_json(char **out_buf, size_t *size) { int root_type; @@ -422,7 +430,7 @@ void flb_test_unix_path() TEST_CHECK(ret == 0); /* waiting to create socket */ - flb_time_msleep(200); + flb_time_msleep(200); memset(&sun, 0, sizeof(sun)); fd = socket(AF_LOCAL, SOCK_STREAM, 0); @@ -507,7 +515,7 @@ void flb_test_unix_perm() TEST_CHECK(ret == 0); /* waiting to create socket */ - flb_time_msleep(200); + flb_time_msleep(200); memset(&sun, 0, sizeof(sun)); fd = socket(AF_LOCAL, SOCK_STREAM, 0); @@ -724,6 +732,122 @@ static int create_simple_json_zstd(msgpack_sbuffer *sbuf) return 0; } +/* + * Creates an uncompressed PackedForward frame with a payload large enough to + * exercise input size boundary checks. + * + * Final structure: [tag, packed_entries, {}] + */ +static int create_large_packedforward(msgpack_sbuffer *sbuf, + size_t entry_count, + size_t message_len) +{ + size_t index; + msgpack_packer pck; + msgpack_sbuffer entries; + char *message; + + message = flb_malloc(message_len); + if (!TEST_CHECK(message != NULL)) { + flb_errno(); + return -1; + } + + for (index = 0; index < message_len; index++) { + message[index] = (char) ('a' + (index % 26)); + } + + msgpack_sbuffer_init(&entries); + msgpack_packer_init(&pck, &entries, msgpack_sbuffer_write); + + for (index = 0; index < entry_count; index++) { + msgpack_pack_array(&pck, 2); + msgpack_pack_uint64(&pck, 1234567890 + index); + msgpack_pack_map(&pck, 1); + msgpack_pack_str_with_body(&pck, "test", 4); + msgpack_pack_str_with_body(&pck, message, message_len); + } + + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + msgpack_pack_array(&pck, 3); + msgpack_pack_str_with_body(&pck, "test", 4); + msgpack_pack_bin_with_body(&pck, entries.data, entries.size); + msgpack_pack_map(&pck, 0); + + msgpack_sbuffer_destroy(&entries); + flb_free(message); + + return 0; +} + +/* + * Creates a compressed PackedForward frame where compressed input can be much + * smaller than decompressed output. + * + * Final structure: [tag, gzip(entries), {"compressed":"gzip","size":N}] + */ +static int create_large_compressed_packedforward(msgpack_sbuffer *sbuf, + size_t entry_count, + size_t message_len) +{ + int ret; + msgpack_sbuffer entries; + msgpack_packer pck; + char *compressed_buf; + size_t compressed_size; + + msgpack_sbuffer_init(&entries); + ret = create_large_packedforward(&entries, entry_count, message_len); + if (!TEST_CHECK(ret == 0)) { + msgpack_sbuffer_destroy(&entries); + return -1; + } + + /* extract only packed entries from [tag, entries, {}] */ + { + msgpack_unpacked result; + size_t off; + msgpack_object root; + msgpack_object payload; + + msgpack_unpacked_init(&result); + off = 0; + ret = msgpack_unpack_next(&result, entries.data, entries.size, &off); + if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&entries); + return -1; + } + + root = result.data; + payload = root.via.array.ptr[1]; + ret = flb_gzip_compress(payload.via.bin.ptr, payload.via.bin.size, + (void **) &compressed_buf, &compressed_size); + + msgpack_unpacked_destroy(&result); + } + + if (!TEST_CHECK(ret == 0)) { + msgpack_sbuffer_destroy(&entries); + return -1; + } + + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + msgpack_pack_array(&pck, 3); + msgpack_pack_str_with_body(&pck, "test", 4); + msgpack_pack_bin_with_body(&pck, compressed_buf, compressed_size); + msgpack_pack_map(&pck, 2); + msgpack_pack_str_with_body(&pck, "compressed", 10); + msgpack_pack_str_with_body(&pck, "gzip", 4); + msgpack_pack_str_with_body(&pck, "size", 4); + msgpack_pack_uint64(&pck, entries.size); + + flb_free(compressed_buf); + msgpack_sbuffer_destroy(&entries); + + return 0; +} + void flb_test_forward_zstd() { struct flb_lib_out_cb cb_data; @@ -784,12 +908,140 @@ void flb_test_forward_zstd() test_ctx_destroy(ctx); } -static int cb_count_only(void *record, size_t size, void *data) +void flb_test_forward_packedforward_payload_exceeds_limit() { - int n = get_output_num(); - set_output_num(n + 1); - flb_free(record); - return 0; + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + msgpack_sbuffer sbuf; + + clear_output_num(); + + cb_data.cb = cb_count_only; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "buffer_chunk_size", "1K", + "buffer_max_size", "1K", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "test", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_init(&sbuf); + ret = create_large_packedforward(&sbuf, 16, 128); + TEST_CHECK(ret == 0); + + w_size = send(fd, sbuf.data, sbuf.size, 0); + if (!TEST_CHECK(w_size == (ssize_t) sbuf.size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + msgpack_sbuffer_destroy(&sbuf); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_destroy(&sbuf); + + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num == 0)) { + TEST_MSG("expected oversized PackedForward payload to be rejected"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + +void flb_test_forward_decompressed_payload_exceeds_limit() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + msgpack_sbuffer sbuf; + + clear_output_num(); + + cb_data.cb = cb_count_only; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "buffer_chunk_size", "512", + "buffer_max_size", "1K", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "test", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_init(&sbuf); + ret = create_large_compressed_packedforward(&sbuf, 32, 128); + TEST_CHECK(ret == 0); + + w_size = send(fd, sbuf.data, sbuf.size, 0); + if (!TEST_CHECK(w_size == (ssize_t) sbuf.size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + msgpack_sbuffer_destroy(&sbuf); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_destroy(&sbuf); + + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num == 0)) { + TEST_MSG("expected oversized decompressed payload to be rejected"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); } void flb_test_threaded_forward_issue_10946() @@ -1049,6 +1301,8 @@ TEST_LIST = { #endif {"forward_gzip", flb_test_forward_gzip}, {"forward_zstd", flb_test_forward_zstd}, + {"forward_packedforward_payload_exceeds_limit", flb_test_forward_packedforward_payload_exceeds_limit}, + {"forward_decompressed_payload_exceeds_limit", flb_test_forward_decompressed_payload_exceeds_limit}, {"issue_10946", flb_test_threaded_forward_issue_10946}, {"fw_auth_users_only_fail_start", flb_test_fw_auth_users_only_fail_start}, {"fw_auth_empty_shared_key_plus_users_start_ok", flb_test_fw_auth_empty_shared_key_plus_users_start_ok}, From 509dbef4354feadf3eeee79d325979f3dc3fccb4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 23 Mar 2026 04:30:54 -0700 Subject: [PATCH 3/3] gzip: Destroy with miniz stream cleanly Signed-off-by: Hiroshi Hatake --- src/flb_gzip.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/flb_gzip.c b/src/flb_gzip.c index b274884269c..9943599ba83 100644 --- a/src/flb_gzip.c +++ b/src/flb_gzip.c @@ -1007,9 +1007,20 @@ void *flb_gzip_decompression_context_create() void flb_gzip_decompression_context_destroy(void *context) { - if (context != NULL) { - flb_free(context); + struct flb_gzip_decompression_context *inner_context; + + if (context == NULL) { + return; + } + + inner_context = (struct flb_gzip_decompression_context *) context; + + if (inner_context->miniz_stream.state != NULL) { + mz_inflateEnd(&inner_context->miniz_stream); + memset(&inner_context->miniz_stream, 0, sizeof(mz_stream)); } + + flb_free(inner_context); } int flb_is_http_session_gzip_compressed(struct mk_http_session *session)