From 3f51519d7c6462af5af0676a70b2086c6e71d6b8 Mon Sep 17 00:00:00 2001 From: "BOSLET, CORY" Date: Tue, 17 Mar 2026 15:54:52 -0400 Subject: [PATCH] out_opentelemetry: Add resource attributes support Signed-off-by: BOSLET, CORY --- plugins/out_opentelemetry/opentelemetry.c | 6 + plugins/out_opentelemetry/opentelemetry.h | 2 + .../out_opentelemetry/opentelemetry_conf.c | 46 ++++- .../out_opentelemetry/opentelemetry_logs.c | 173 ++++++++++++++++++ 4 files changed, 225 insertions(+), 2 deletions(-) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 47cd6d12623..a4dfe2a859c 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -1030,6 +1030,12 @@ static struct flb_config_map config_map[] = { add_labels), "Adds a custom label to the metrics use format: 'add_label name value'" }, + { + FLB_CONFIG_MAP_SLIST_1, "logs_resource_attributes_message_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, + ra_resource_attributes_message), + "Specify a resource attribute key" + }, { FLB_CONFIG_MAP_STR, "http2", "off", 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2), diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index 806d9d700db..0055758b5be 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -138,6 +138,8 @@ struct opentelemetry_context { flb_sds_t logs_severity_number_message_key; struct flb_record_accessor *ra_severity_number_message; + struct mk_list *ra_resource_attributes_message; + /* Number of logs to flush at a time */ int batch_size; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 6e3d04d5b5a..8edeffa0c45 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -148,7 +148,43 @@ static int metadata_mp_accessor_create(struct opentelemetry_context *ctx) return 0; } -static int config_add_labels(struct flb_output_instance *ins, +/* + * config_ra_resource_attributes_message: validate the configured keys at init + * time. The actual key lookup at flush time reads ctx->ra_resource_attributes_message + * directly (config-map-managed pointer) to avoid per-worker context list issues. + */ +static int config_ra_resource_attributes_message(struct flb_output_instance *ins, + struct opentelemetry_context *ctx) +{ + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *k; + + if (!ctx->ra_resource_attributes_message || + mk_list_size(ctx->ra_resource_attributes_message) == 0) { + return 0; + } + + flb_plg_debug(ins, "resource attributes: %d key(s) configured", + mk_list_size(ctx->ra_resource_attributes_message)); + + /* validate each entry has exactly one value */ + flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { + if (mk_list_size(mv->val.list) != 1) { + flb_plg_error(ins, "'logs_resource_attributes_message_key' expects a single key name, " + "e.g: 'logs_resource_attributes_message_key service.name' " + "(or '$service.name')"); + return -1; + } + + k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + flb_plg_debug(ins, "resource attributes: registered key '%s'", k->str); + } + + return 0; +} + +static int config_log_body(struct flb_output_instance *ins, struct opentelemetry_context *ctx) { struct mk_list *head; @@ -294,7 +330,13 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output } /* Parse 'add_label' */ - ret = config_add_labels(ins, ctx); + ret = config_log_body(ins, ctx); + if (ret == -1) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + + ret = config_ra_resource_attributes_message(ins, ctx); if (ret == -1) { flb_opentelemetry_context_destroy(ctx); return NULL; diff --git a/plugins/out_opentelemetry/opentelemetry_logs.c b/plugins/out_opentelemetry/opentelemetry_logs.c index 4c174ea49d4..c1aeef46797 100644 --- a/plugins/out_opentelemetry/opentelemetry_logs.c +++ b/plugins/out_opentelemetry/opentelemetry_logs.c @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -780,6 +781,163 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even return ret; } +/* + * For each key name in ctx->ra_resource_attributes_message_list, look it up in + * the msgpack message body and promote the value to an OTLP resource attribute. + */ +static void set_resource_attributes_from_message_body( + struct opentelemetry_context *ctx, + msgpack_object *body, + Opentelemetry__Proto__Resource__V1__Resource *resource) +{ + int i; + size_t key_len; + size_t map_key_len; + char *map_key_ptr; + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *entry; + const char *normalized_key; + msgpack_object_kv *kv; + Opentelemetry__Proto__Common__V1__KeyValue *attr; + Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs; + + /* + * Use ctx->ra_resource_attributes_message directly — this is the pointer + * managed by the Fluent Bit config-map framework and is reliably available + * in every worker thread context, unlike an embedded mk_list copy. + */ + if (!ctx->ra_resource_attributes_message || + mk_list_size(ctx->ra_resource_attributes_message) == 0) { + return; + } + + if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { + return; + } + + /* Iterate directly over the config-map-managed list */ + flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { + if (mk_list_size(mv->val.list) != 1) { + continue; + } + + entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + normalized_key = entry->str; + key_len = flb_sds_len(entry->str); + + if (key_len == 0) { + continue; + } + + /* + * Allow optional record accessor prefix so both "service.name" and + * "$service.name" are treated as the same map key. + */ + if (key_len > 0 && normalized_key[0] == '$') { + normalized_key++; + key_len--; + } + + /* + * Also tolerate bracket forms like $['service.name'] and + * $["service.name"] for literal keys. + */ + if (key_len >= 4 && normalized_key[0] == '[') { + if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && + normalized_key[key_len - 1] == ']') || + (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && + normalized_key[key_len - 1] == ']')) { + normalized_key += 2; + key_len -= 4; + } + } + + if (key_len == 0) { + continue; + } + + /* tolerate quoted key names like "service.name" or 'service.name' */ + if (key_len >= 2) { + if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || + (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { + normalized_key++; + key_len -= 2; + } + } + + if (key_len == 0) { + continue; + } + + for (i = 0; i < body->via.map.size; i++) { + kv = &body->via.map.ptr[i]; + + if (kv->key.type == MSGPACK_OBJECT_STR) { + map_key_ptr = kv->key.via.str.ptr; + map_key_len = kv->key.via.str.size; + } + else if (kv->key.type == MSGPACK_OBJECT_BIN) { + map_key_ptr = (char *) kv->key.via.bin.ptr; + map_key_len = kv->key.via.bin.size; + } + else { + continue; + } + + if (map_key_len != key_len) { + continue; + } + + if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { + continue; + } + + /* Found the key — convert to OTLP KeyValue */ + if (kv->key.type == MSGPACK_OBJECT_STR) { + attr = msgpack_kv_to_otlp_any_value(kv); + } + else { + attr = otlp_kvpair_value_initialize(); + if (attr != NULL) { + attr->key = flb_strndup(map_key_ptr, map_key_len); + + if (attr->key != NULL) { + attr->value = msgpack_object_to_otlp_any_value(&kv->val); + } + + if (attr->key == NULL || attr->value == NULL) { + otlp_kvpair_destroy(attr); + attr = NULL; + } + } + } + + if (!attr) { + flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", + entry->str); + break; + } + + /* Grow the resource attributes array by one slot */ + tmp_attrs = flb_realloc(resource->attributes, + (resource->n_attributes + 1) * + sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + if (!tmp_attrs) { + flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", + entry->str); + otlp_kvpair_destroy(attr); + break; + } + + resource->attributes = tmp_attrs; + resource->attributes[resource->n_attributes] = attr; + resource->n_attributes++; + break; + } + } +} + static int set_resource_attributes(struct flb_record_accessor *ra, msgpack_object *map, Opentelemetry__Proto__Resource__V1__Resource *resource) @@ -1021,6 +1179,18 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, ret = FLB_OK; while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { + /* + * For standalone records (non-native OTLP groups), resource attributes + * promoted from message keys are record-specific. Force a fresh + * resource/scope context per record when this feature is enabled to + * avoid carrying stale values across subsequent log lines. + */ + if (native_otel == FLB_FALSE && + ctx->ra_resource_attributes_message && + mk_list_size(ctx->ra_resource_attributes_message) > 0) { + resource_id = -1; + scope_id = -1; + } /* Check if the record is special (group) or a normal one */ ret = flb_log_event_decoder_get_record_type(&event, &record_type); if (ret != 0) { @@ -1130,6 +1300,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, /* group body: $schema_url */ set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log); + + /* message body: promote configured keys to resource attributes */ + set_resource_attributes_from_message_body(ctx, event.body, resource_log->resource); /* prepare the scopes */ if (!resource_log->scope_logs) {