-
Notifications
You must be signed in to change notification settings - Fork 1.9k
out_opentelemetry: Add resource attributes support #11574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| #include <fluent-bit/flb_log_event_decoder.h> | ||
| #include <fluent-bit/flb_ra_key.h> | ||
| #include <fluent-bit/flb_gzip.h> | ||
| #include <fluent-bit/flb_slist.h> | ||
|
|
||
| #include <fluent-otel-proto/fluent-otel.h> | ||
|
|
||
|
|
@@ -780,6 +781,163 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even | |
| return ret; | ||
| } | ||
|
|
||
| /* | ||
| * For each key name in ctx->ra_resource_attributes_message_list, look it up in | ||
| * the msgpack message body and promote the value to an OTLP resource attribute. | ||
| */ | ||
| static void set_resource_attributes_from_message_body( | ||
| struct opentelemetry_context *ctx, | ||
| msgpack_object *body, | ||
| Opentelemetry__Proto__Resource__V1__Resource *resource) | ||
| { | ||
| int i; | ||
| size_t key_len; | ||
| size_t map_key_len; | ||
| char *map_key_ptr; | ||
| struct mk_list *head; | ||
| struct flb_config_map_val *mv; | ||
| struct flb_slist_entry *entry; | ||
| const char *normalized_key; | ||
| msgpack_object_kv *kv; | ||
| Opentelemetry__Proto__Common__V1__KeyValue *attr; | ||
| Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs; | ||
|
|
||
| /* | ||
| * Use ctx->ra_resource_attributes_message directly — this is the pointer | ||
| * managed by the Fluent Bit config-map framework and is reliably available | ||
| * in every worker thread context, unlike an embedded mk_list copy. | ||
| */ | ||
| if (!ctx->ra_resource_attributes_message || | ||
| mk_list_size(ctx->ra_resource_attributes_message) == 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { | ||
| return; | ||
| } | ||
|
|
||
| /* Iterate directly over the config-map-managed list */ | ||
| flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { | ||
| if (mk_list_size(mv->val.list) != 1) { | ||
| continue; | ||
| } | ||
|
|
||
| entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); | ||
| normalized_key = entry->str; | ||
| key_len = flb_sds_len(entry->str); | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* | ||
| * Allow optional record accessor prefix so both "service.name" and | ||
| * "$service.name" are treated as the same map key. | ||
| */ | ||
| if (key_len > 0 && normalized_key[0] == '$') { | ||
| normalized_key++; | ||
| key_len--; | ||
| } | ||
|
|
||
| /* | ||
| * Also tolerate bracket forms like $['service.name'] and | ||
| * $["service.name"] for literal keys. | ||
| */ | ||
| if (key_len >= 4 && normalized_key[0] == '[') { | ||
| if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && | ||
| normalized_key[key_len - 1] == ']') || | ||
| (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && | ||
| normalized_key[key_len - 1] == ']')) { | ||
| normalized_key += 2; | ||
| key_len -= 4; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* tolerate quoted key names like "service.name" or 'service.name' */ | ||
| if (key_len >= 2) { | ||
| if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || | ||
| (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { | ||
| normalized_key++; | ||
| key_len -= 2; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| for (i = 0; i < body->via.map.size; i++) { | ||
| kv = &body->via.map.ptr[i]; | ||
|
|
||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| map_key_ptr = kv->key.via.str.ptr; | ||
| map_key_len = kv->key.via.str.size; | ||
| } | ||
| else if (kv->key.type == MSGPACK_OBJECT_BIN) { | ||
| map_key_ptr = (char *) kv->key.via.bin.ptr; | ||
| map_key_len = kv->key.via.bin.size; | ||
| } | ||
| else { | ||
| continue; | ||
| } | ||
|
|
||
| if (map_key_len != key_len) { | ||
| continue; | ||
| } | ||
|
|
||
| if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* Found the key — convert to OTLP KeyValue */ | ||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| attr = msgpack_kv_to_otlp_any_value(kv); | ||
| } | ||
| else { | ||
| attr = otlp_kvpair_value_initialize(); | ||
| if (attr != NULL) { | ||
| attr->key = flb_strndup(map_key_ptr, map_key_len); | ||
|
|
||
| if (attr->key != NULL) { | ||
| attr->value = msgpack_object_to_otlp_any_value(&kv->val); | ||
| } | ||
|
|
||
| if (attr->key == NULL || attr->value == NULL) { | ||
| otlp_kvpair_destroy(attr); | ||
| attr = NULL; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!attr) { | ||
| flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", | ||
| entry->str); | ||
| break; | ||
| } | ||
|
|
||
| /* Grow the resource attributes array by one slot */ | ||
| tmp_attrs = flb_realloc(resource->attributes, | ||
| (resource->n_attributes + 1) * | ||
| sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); | ||
| if (!tmp_attrs) { | ||
| flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", | ||
| entry->str); | ||
| otlp_kvpair_destroy(attr); | ||
| break; | ||
| } | ||
|
|
||
| resource->attributes = tmp_attrs; | ||
| resource->attributes[resource->n_attributes] = attr; | ||
| resource->n_attributes++; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static int set_resource_attributes(struct flb_record_accessor *ra, | ||
| msgpack_object *map, | ||
| Opentelemetry__Proto__Resource__V1__Resource *resource) | ||
|
|
@@ -1021,6 +1179,18 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, | |
|
|
||
| ret = FLB_OK; | ||
| while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { | ||
| /* | ||
| * For standalone records (non-native OTLP groups), resource attributes | ||
| * promoted from message keys are record-specific. Force a fresh | ||
| * resource/scope context per record when this feature is enabled to | ||
| * avoid carrying stale values across subsequent log lines. | ||
| */ | ||
| if (native_otel == FLB_FALSE && | ||
| ctx->ra_resource_attributes_message && | ||
| mk_list_size(ctx->ra_resource_attributes_message) > 0) { | ||
| resource_id = -1; | ||
| scope_id = -1; | ||
|
Comment on lines
+1188
to
+1192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
| } | ||
|
Comment on lines
+1182
to
+1193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This turns With 🤖 Prompt for AI Agents |
||
| /* Check if the record is special (group) or a normal one */ | ||
| ret = flb_log_event_decoder_get_record_type(&event, &record_type); | ||
| if (ret != 0) { | ||
|
|
@@ -1130,6 +1300,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, | |
|
|
||
| /* group body: $schema_url */ | ||
| set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log); | ||
|
|
||
| /* message body: promote configured keys to resource attributes */ | ||
| set_resource_attributes_from_message_body(ctx, event.body, resource_log->resource); | ||
|
|
||
| /* prepare the scopes */ | ||
| if (!resource_log->scope_logs) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grouped OTLP records never read the actual log body here.
This helper is wired into the resource-creation path, so for native OTLP input it runs while handling
FLB_LOG_EVENT_GROUP_START. At that pointevent.bodyis the group descriptor ($resource/$scope), not theFLB_LOG_EVENT_NORMALbody, sologs_resource_attributes_message_keyonly works for standalone logs. If grouped OTLP records are meant to be supported too, the lookup needs to happen in the per-record path before resource selection.Also applies to: 1304-1305
🤖 Prompt for AI Agents