Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,12 @@ static struct flb_config_map config_map[] = {
add_labels),
"Adds a custom label to the metrics use format: 'add_label name value'"
},
{
FLB_CONFIG_MAP_SLIST_1, "logs_resource_attributes_message_key", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context,
ra_resource_attributes_message),
"Specify a resource attribute key"
},
{
FLB_CONFIG_MAP_STR, "http2", "off",
0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2),
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ struct opentelemetry_context {
flb_sds_t logs_severity_number_message_key;
struct flb_record_accessor *ra_severity_number_message;

struct mk_list *ra_resource_attributes_message;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
46 changes: 44 additions & 2 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,43 @@ static int metadata_mp_accessor_create(struct opentelemetry_context *ctx)
return 0;
}

static int config_add_labels(struct flb_output_instance *ins,
/*
* config_ra_resource_attributes_message: validate the configured keys at init
* time. The actual key lookup at flush time reads ctx->ra_resource_attributes_message
* directly (config-map-managed pointer) to avoid per-worker context list issues.
*/
static int config_ra_resource_attributes_message(struct flb_output_instance *ins,
struct opentelemetry_context *ctx)
{
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *k;

if (!ctx->ra_resource_attributes_message ||
mk_list_size(ctx->ra_resource_attributes_message) == 0) {
return 0;
}

flb_plg_debug(ins, "resource attributes: %d key(s) configured",
mk_list_size(ctx->ra_resource_attributes_message));

/* validate each entry has exactly one value */
flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) {
if (mk_list_size(mv->val.list) != 1) {
flb_plg_error(ins, "'logs_resource_attributes_message_key' expects a single key name, "
"e.g: 'logs_resource_attributes_message_key service.name' "
"(or '$service.name')");
return -1;
}

k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
flb_plg_debug(ins, "resource attributes: registered key '%s'", k->str);
}

return 0;
}

static int config_log_body(struct flb_output_instance *ins,
struct opentelemetry_context *ctx)
{
struct mk_list *head;
Expand Down Expand Up @@ -294,7 +330,13 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}

/* Parse 'add_label' */
ret = config_add_labels(ins, ctx);
ret = config_log_body(ins, ctx);
if (ret == -1) {
flb_opentelemetry_context_destroy(ctx);
return NULL;
}

ret = config_ra_resource_attributes_message(ins, ctx);
if (ret == -1) {
flb_opentelemetry_context_destroy(ctx);
return NULL;
Expand Down
173 changes: 173 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_slist.h>

#include <fluent-otel-proto/fluent-otel.h>

Expand Down Expand Up @@ -780,6 +781,163 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even
return ret;
}

/*
* For each key name in ctx->ra_resource_attributes_message_list, look it up in
* the msgpack message body and promote the value to an OTLP resource attribute.
*/
static void set_resource_attributes_from_message_body(
struct opentelemetry_context *ctx,
msgpack_object *body,
Opentelemetry__Proto__Resource__V1__Resource *resource)
{
int i;
size_t key_len;
size_t map_key_len;
char *map_key_ptr;
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *entry;
const char *normalized_key;
msgpack_object_kv *kv;
Opentelemetry__Proto__Common__V1__KeyValue *attr;
Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs;

/*
* Use ctx->ra_resource_attributes_message directly — this is the pointer
* managed by the Fluent Bit config-map framework and is reliably available
* in every worker thread context, unlike an embedded mk_list copy.
*/
if (!ctx->ra_resource_attributes_message ||
mk_list_size(ctx->ra_resource_attributes_message) == 0) {
return;
}

if (body == NULL || body->type != MSGPACK_OBJECT_MAP) {
return;
}

/* Iterate directly over the config-map-managed list */
flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) {
if (mk_list_size(mv->val.list) != 1) {
continue;
}

entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
normalized_key = entry->str;
key_len = flb_sds_len(entry->str);

if (key_len == 0) {
continue;
}

/*
* Allow optional record accessor prefix so both "service.name" and
* "$service.name" are treated as the same map key.
*/
if (key_len > 0 && normalized_key[0] == '$') {
normalized_key++;
key_len--;
}

/*
* Also tolerate bracket forms like $['service.name'] and
* $["service.name"] for literal keys.
*/
if (key_len >= 4 && normalized_key[0] == '[') {
if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' &&
normalized_key[key_len - 1] == ']') ||
(normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' &&
normalized_key[key_len - 1] == ']')) {
normalized_key += 2;
key_len -= 4;
}
}

if (key_len == 0) {
continue;
}

/* tolerate quoted key names like "service.name" or 'service.name' */
if (key_len >= 2) {
if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') ||
(normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) {
normalized_key++;
key_len -= 2;
}
}

if (key_len == 0) {
continue;
}

for (i = 0; i < body->via.map.size; i++) {
kv = &body->via.map.ptr[i];

if (kv->key.type == MSGPACK_OBJECT_STR) {
map_key_ptr = kv->key.via.str.ptr;
map_key_len = kv->key.via.str.size;
}
else if (kv->key.type == MSGPACK_OBJECT_BIN) {
map_key_ptr = (char *) kv->key.via.bin.ptr;
map_key_len = kv->key.via.bin.size;
}
else {
continue;
}

if (map_key_len != key_len) {
continue;
}

if (strncmp(map_key_ptr, normalized_key, key_len) != 0) {
continue;
}

/* Found the key — convert to OTLP KeyValue */
if (kv->key.type == MSGPACK_OBJECT_STR) {
attr = msgpack_kv_to_otlp_any_value(kv);
}
else {
attr = otlp_kvpair_value_initialize();
if (attr != NULL) {
attr->key = flb_strndup(map_key_ptr, map_key_len);

if (attr->key != NULL) {
attr->value = msgpack_object_to_otlp_any_value(&kv->val);
}

if (attr->key == NULL || attr->value == NULL) {
otlp_kvpair_destroy(attr);
attr = NULL;
}
}
}

if (!attr) {
flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue",
entry->str);
break;
}

/* Grow the resource attributes array by one slot */
tmp_attrs = flb_realloc(resource->attributes,
(resource->n_attributes + 1) *
sizeof(Opentelemetry__Proto__Common__V1__KeyValue *));
if (!tmp_attrs) {
flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'",
entry->str);
otlp_kvpair_destroy(attr);
break;
}

resource->attributes = tmp_attrs;
resource->attributes[resource->n_attributes] = attr;
resource->n_attributes++;
break;
}
}
}
Comment on lines +784 to +939
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Grouped OTLP records never read the actual log body here.

This helper is wired into the resource-creation path, so for native OTLP input it runs while handling FLB_LOG_EVENT_GROUP_START. At that point event.body is the group descriptor ($resource / $scope), not the FLB_LOG_EVENT_NORMAL body, so logs_resource_attributes_message_key only works for standalone logs. If grouped OTLP records are meant to be supported too, the lookup needs to happen in the per-record path before resource selection.

Also applies to: 1304-1305

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 784 - 939, The
function set_resource_attributes_from_message_body is being called during
resource creation when handling FLB_LOG_EVENT_GROUP_START, but at that time
event.body is the group descriptor (not per-record bodies), so resource
attributes derived from message keys miss grouped records; move or duplicate the
message-body key lookup into the per-record handling path (the code that
processes FLB_LOG_EVENT_NORMAL records) so you call
set_resource_attributes_from_message_body (or its core lookup logic) using the
actual per-record msgpack body before resource selection/assignment; update
callers (remove or guard the call during GROUP_START) and ensure the lookup uses
ctx->ra_resource_attributes_message and the per-record msgpack_object body so
grouped OTLP records get their resource attributes populated.


static int set_resource_attributes(struct flb_record_accessor *ra,
msgpack_object *map,
Opentelemetry__Proto__Resource__V1__Resource *resource)
Expand Down Expand Up @@ -1021,6 +1179,18 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,

ret = FLB_OK;
while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) {
/*
* For standalone records (non-native OTLP groups), resource attributes
* promoted from message keys are record-specific. Force a fresh
* resource/scope context per record when this feature is enabled to
* avoid carrying stale values across subsequent log lines.
*/
if (native_otel == FLB_FALSE &&
ctx->ra_resource_attributes_message &&
mk_list_size(ctx->ra_resource_attributes_message) > 0) {
resource_id = -1;
scope_id = -1;
Comment on lines +1188 to +1192

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep resource array growth on per-record reset path

When logs_resource_attributes_message_key is enabled for non-native OTLP input, this new block resets resource_id/scope_id on every event, which forces every standalone record through the goto start_resource path in otel_process_logs. That label sits after the resource-capacity/max-resources checks, so repeated records append to resource_logs[export_logs.n_resource_logs] without reallocation; once a chunk has more than the initial 256 resources, this can write past the allocated array and corrupt memory.

Useful? React with 👍 / 👎.

}
Comment on lines +1182 to +1193
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This turns batch_size into a no-op for standalone logs.

With logs_resource_attributes_message_key configured, every non-OTLP record forces resource_id and scope_id back to -1, so a new resource/scope is created and log_record_count is reset on every record. That makes the flush threshold at Line 1507 unreachable for the common batch_size > 1 case, while each forced scope still allocates a ctx->batch_size-slot log_records buffer at Line 1398. Please add an export-wide flush boundary for this mode, or otherwise cap the per-record resource split.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 1182 - 1193, The
current code forces resource_id and scope_id to -1 for every non-OTLP record
when ctx->ra_resource_attributes_message is set, which breaks batching by
resetting log_record_count per record; change the logic in the block that
manipulates resource_id/scope_id so it does NOT unconditionally reset per
record—either (a) only reset resource_id/scope_id when the promoted resource
attributes actually change compared to the last record (store and compare the
last promoted resource signature), or (b) respect an export-wide flush boundary
by tracking an export counter and only forcing a new resource/scope when that
counter reaches ctx->batch_size (or when attributes change); update uses of
resource_id, scope_id, and log_record_count to follow this rule so
ctx->batch_size and the flush threshold can be honored while still isolating
records with different promoted attributes.

/* Check if the record is special (group) or a normal one */
ret = flb_log_event_decoder_get_record_type(&event, &record_type);
if (ret != 0) {
Expand Down Expand Up @@ -1130,6 +1300,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,

/* group body: $schema_url */
set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log);

/* message body: promote configured keys to resource attributes */
set_resource_attributes_from_message_body(ctx, event.body, resource_log->resource);

/* prepare the scopes */
if (!resource_log->scope_logs) {
Expand Down
Loading