Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
static int fw_conn_event_internal(struct flb_connection *connection)
{
int ret;
int bytes;
int available;
int size;
ssize_t bytes;
size_t read_bytes;
size_t available;
size_t size;
char *tmp;
struct fw_conn *conn;
struct mk_event *event;
Expand Down Expand Up @@ -66,7 +67,7 @@ static int fw_conn_event_internal(struct flb_connection *connection)
available = (conn->buf_size - conn->buf_len);
if (available < 1) {
if (conn->buf_size >= ctx->buffer_max_size) {
flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%lu bytes)",
flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%zu bytes)",
event->fd, (ctx->buffer_max_size));
fw_conn_del(conn);
return -1;
Expand All @@ -84,7 +85,7 @@ static int fw_conn_event_internal(struct flb_connection *connection)
flb_errno();
return -1;
}
flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i",
flb_plg_trace(ctx->ins, "fd=%i buffer realloc %zu -> %zu",
event->fd, conn->buf_size, size);

conn->buf = tmp;
Expand All @@ -97,9 +98,10 @@ static int fw_conn_event_internal(struct flb_connection *connection)
available);

if (bytes > 0) {
flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i",
bytes, conn->buf_len, conn->buf_len + bytes);
conn->buf_len += bytes;
read_bytes = (size_t) bytes;
flb_plg_trace(ctx->ins, "read()=%zd pre_len=%zu now_len=%zu",
bytes, conn->buf_len, conn->buf_len + read_bytes);
conn->buf_len += read_bytes;

ret = fw_prot_process(ctx->ins, conn);
if (ret == -1) {
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_forward/fw_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ struct fw_conn {

/* Buffer */
char *buf; /* Buffer data */
int buf_len; /* Data length */
int buf_size; /* Buffer size */
size_t buf_len; /* Data length */
size_t buf_size; /* Buffer size */
size_t rest; /* Unpacking offset */

/* Decompression context */
Expand Down
89 changes: 87 additions & 2 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,19 @@ static int fw_process_message_mode_entry(
static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) {
size_t off;
size_t actual_size;
size_t buf_len;

if (conn->rest > conn->buf_len) {
return 0;
}

buf_len = conn->buf_len;
off = buf_len - conn->rest;

if (off > buf_len) {
return 0;
}

off = conn->buf_len - conn->rest;
actual_size = try_size;

if (actual_size > conn->rest) {
Expand Down Expand Up @@ -1341,6 +1352,11 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
*
* https://github.com/msgpack/msgpack-c/issues/514
*/
if (bytes > 0 && all_used > SIZE_MAX - bytes) {
flb_plg_error(ctx->ins, "incoming frame size accounting overflow");
goto cleanup_msgpack;
}

all_used += bytes;


Expand Down Expand Up @@ -1520,6 +1536,13 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

if (data) {
if (len > ctx->buffer_max_size) {
flb_plg_error(ctx->ins,
"packedforward payload too large (%zu bytes), limit=%zu",
len, ctx->buffer_max_size);
goto cleanup_msgpack;
}

/* Get event type early for use in both compressed/uncompressed paths */
event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
Expand Down Expand Up @@ -1565,16 +1588,35 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

if (conn->compression_type != FLB_COMPRESSION_ALGORITHM_NONE) {
char *decoded_payload = NULL;
char *decomp_buf = NULL;
uint8_t *append_ptr;
size_t available_space;
size_t decomp_len;
size_t total_decompressed;
int decomp_ret;
size_t required_size;

total_decompressed = 0;

available_space = flb_decompression_context_get_available_space(conn->d_ctx);
if (len > available_space) {
if (conn->d_ctx->input_buffer_length > SIZE_MAX - len) {
flb_plg_error(ctx->ins,
"decompression input size overflow");

goto cleanup_decompress;
}

required_size = conn->d_ctx->input_buffer_length + len;
if (required_size > ctx->buffer_max_size) {
flb_plg_error(ctx->ins,
"compressed payload exceeds limit (%zu bytes)",
ctx->buffer_max_size);

goto cleanup_decompress;
}

if (flb_decompression_context_resize_buffer(conn->d_ctx, required_size) != 0) {
flb_plg_error(ctx->ins, "cannot resize decompression buffer");

Expand All @@ -1592,13 +1634,22 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
goto cleanup_decompress;
}

decoded_payload = flb_malloc(ctx->buffer_max_size);
if (!decoded_payload) {
flb_errno();
flb_free(decomp_buf);

goto cleanup_decompress;
}

do {
decomp_len = ctx->buffer_chunk_size;
decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len);

if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) {
if (decomp_len > 0) {
flb_plg_error(ctx->ins, "decompression failed, data may be corrupt");
flb_free(decoded_payload);
flb_free(decomp_buf);

goto cleanup_decompress;
Expand All @@ -1607,16 +1658,50 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

if (decomp_len > 0) {
if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) {
if (total_decompressed > SIZE_MAX - decomp_len) {
flb_plg_error(ctx->ins,
"decompressed output size overflow");
flb_free(decoded_payload);
flb_free(decomp_buf);

goto cleanup_decompress;
}

total_decompressed += decomp_len;

if (total_decompressed > ctx->buffer_max_size) {
flb_plg_error(ctx->ins,
"decompressed payload exceeds limit (%zu bytes)",
ctx->buffer_max_size);
flb_free(decoded_payload);
flb_free(decomp_buf);

goto cleanup_decompress;
}

memcpy(decoded_payload + (total_decompressed - decomp_len),
decomp_buf,
decomp_len);
}
} while (decomp_len > 0);

flb_free(decomp_buf);

if (total_decompressed > 0) {
if (append_log(ins,
conn,
event_type,
out_tag,
decoded_payload,
total_decompressed) == -1) {
flb_free(decoded_payload);

goto cleanup_decompress;
}
}

flb_free(decoded_payload);

flb_decompression_context_destroy(conn->d_ctx);
conn->d_ctx = NULL;
conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE;
Expand Down
15 changes: 13 additions & 2 deletions src/flb_gzip.c
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,20 @@ void *flb_gzip_decompression_context_create()

void flb_gzip_decompression_context_destroy(void *context)
{
if (context != NULL) {
flb_free(context);
struct flb_gzip_decompression_context *inner_context;

if (context == NULL) {
return;
}

inner_context = (struct flb_gzip_decompression_context *) context;

if (inner_context->miniz_stream.state != NULL) {
mz_inflateEnd(&inner_context->miniz_stream);
memset(&inner_context->miniz_stream, 0, sizeof(mz_stream));
}

flb_free(inner_context);
}

int flb_is_http_session_gzip_compressed(struct mk_http_session *session)
Expand Down
Loading
Loading