diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index a650809c4d4..b0b88d2d3a1 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -31,8 +31,11 @@ #include #include #include +#include +#include #include +#include #include "azure_blob.h" #include "azure_blob_db.h" @@ -44,6 +47,7 @@ #include "azure_blob_store.h" #define CREATE_BLOB 1337 +#define AZB_UUID_PLACEHOLDER "$UUID" /* thread_local_storage for workers */ @@ -53,6 +57,8 @@ struct worker_info { FLB_TLS_DEFINE(struct worker_info, worker_info); +static int create_blob(struct flb_azure_blob *ctx, const char *path_prefix, char *name); + static int azure_blob_format(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, @@ -149,34 +155,525 @@ static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_da return 0; } +/** + * Populate the provided buffer with pseudo-random alphanumeric characters. + * The buffer must have room for `length + 1` bytes to include the terminator. + */ void generate_random_string_blob(char *str, size_t length) { const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; const size_t charset_size = sizeof(charset) - 1; + unsigned char *rand_buf; size_t i; - size_t index; - /* Seed the random number generator with multiple sources of entropy */ - unsigned int seed = (unsigned int)(time(NULL) ^ clock() ^ getpid()); - srand(seed); + if (length == 0) { + str[0] = '\0'; + return; + } - for (i = 0; i < length; ++i) { - index = (size_t)rand() % charset_size; - str[i] = charset[index]; + rand_buf = flb_malloc(length); + if (!rand_buf) { + flb_errno(); + memset(str, 'a', length); + str[length] = '\0'; + return; + } + + if (flb_random_bytes(rand_buf, length) != 0) { + struct flb_time now; + uint32_t state; + + flb_time_get(&now); + state = (uint32_t) now.tm.tv_nsec ^ (uint32_t) now.tm.tv_sec; + for (i = 0; i < length; i++) { + state = state * 1103515245u + 12345u; + rand_buf[i] = (unsigned char) (state >> 16); + } } + for (i = 0; i < length; i++) { + str[i] = charset[rand_buf[i] % charset_size]; + } str[length] = '\0'; + + flb_free(rand_buf); +} + +static inline int azb_path_uses_templating_tokens(const char *path) +{ + size_t i; + + if (!path) { + return FLB_FALSE; + } + + for (i = 0; path[i] != '\0'; i++) { + if (path[i] == '$' || path[i] == '%') { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +const char *azb_commit_prefix_with_fallback(struct flb_azure_blob *ctx, + const char *db_prefix) +{ + if (db_prefix) { + return db_prefix; + } + + if (!ctx || !ctx->path) { + return NULL; + } + + if (ctx->path_templating_enabled == FLB_FALSE || + azb_path_uses_templating_tokens(ctx->path) == FLB_FALSE) { + return ctx->path; + } + + return NULL; +} + +/** + * Replace all "$UUID" placeholders in the path with the same random suffix. + * Returns a newly allocated SDS string and frees the original `path` value. + */ +static flb_sds_t azb_replace_uuid(flb_sds_t path) +{ + char random_buf[9] = {0}; + const size_t token_len = strlen(AZB_UUID_PLACEHOLDER); + size_t occurrences = 0; + size_t path_len; + size_t result_len; + char *cursor; + char *match; + char *dst; + flb_sds_t result; + + if (!path) { + return NULL; + } + + cursor = path; + while ((match = strstr(cursor, AZB_UUID_PLACEHOLDER)) != NULL) { + occurrences++; + cursor = match + token_len; + } + + if (occurrences == 0) { + return path; + } + + generate_random_string_blob(random_buf, 8); + + path_len = flb_sds_len(path); + result_len = path_len + occurrences * (8 - token_len); + + result = flb_sds_create_size(result_len + 1); + if (!result) { + flb_errno(); + flb_sds_destroy(path); + return NULL; + } + + dst = result; + cursor = path; + while ((match = strstr(cursor, AZB_UUID_PLACEHOLDER)) != NULL) { + size_t segment_len; + + segment_len = (size_t)(match - cursor); + if (segment_len > 0) { + memcpy(dst, cursor, segment_len); + dst += segment_len; + } + + memcpy(dst, random_buf, 8); + dst += 8; + + cursor = match + token_len; + } + + if (cursor < path + path_len) { + size_t tail_len; + + tail_len = (size_t)((path + path_len) - cursor); + if (tail_len > 0) { + memcpy(dst, cursor, tail_len); + dst += tail_len; + } + } + + *dst = '\0'; + flb_sds_len_set(result, result_len); + + flb_sds_destroy(path); + return result; } -static int create_blob(struct flb_azure_blob *ctx, char *name) +/** + * Replace the first occurrence of `token` with `replacement` in the SDS input. + * The original string is destroyed and a new SDS instance is returned. + */ +static flb_sds_t azb_simple_replace(flb_sds_t input, + const char *token, + const char *replacement) +{ + char *pos; + size_t token_len; + size_t replace_len; + size_t prefix_len; + size_t suffix_len; + flb_sds_t result; + + if (!input || !token) { + return input; + } + + pos = strstr(input, token); + if (!pos) { + return input; + } + + token_len = strlen(token); + replace_len = strlen(replacement); + prefix_len = (size_t)(pos - input); + suffix_len = flb_sds_len(input) - prefix_len - token_len; + + result = flb_sds_create_size(prefix_len + replace_len + suffix_len + 1); + if (!result) { + flb_errno(); + flb_sds_destroy(input); + return NULL; + } + + if (prefix_len > 0) { + memcpy(result, input, prefix_len); + } + if (replace_len > 0) { + memcpy(result + prefix_len, replacement, replace_len); + } + if (suffix_len > 0) { + memcpy(result + prefix_len + replace_len, pos + token_len, suffix_len); + } + result[prefix_len + replace_len + suffix_len] = '\0'; + flb_sds_len_set(result, prefix_len + replace_len + suffix_len); + + flb_sds_destroy(input); + return result; +} + +/** + * Expand millisecond and nanosecond custom tokens within the blob path. + */ +static flb_sds_t azb_apply_time_tokens(flb_sds_t path, const struct flb_time *timestamp) +{ + char ms_buf[4]; + char ns_buf[10]; + flb_sds_t tmp; + + if (!path || !timestamp) { + return path; + } + + snprintf(ms_buf, sizeof(ms_buf), "%03lu", + (unsigned long)(timestamp->tm.tv_nsec / 1000000)); + snprintf(ns_buf, sizeof(ns_buf), "%09lu", + (unsigned long)timestamp->tm.tv_nsec); + + /* Replace %3N with milliseconds */ + tmp = azb_simple_replace(path, "%3N", ms_buf); + if (!tmp) { + return NULL; + } + path = tmp; + + /* Replace %9N with nanoseconds */ + tmp = azb_simple_replace(path, "%9N", ns_buf); + if (!tmp) { + return NULL; + } + path = tmp; + + /* Replace %L with nanoseconds */ + tmp = azb_simple_replace(path, "%L", ns_buf); + if (!tmp) { + return NULL; + } + + return tmp; +} + +/** + * Apply `strftime` formatting using the provided event timestamp. + */ +static flb_sds_t azb_apply_strftime(struct flb_azure_blob *ctx, + flb_sds_t path, + const struct flb_time *timestamp) +{ + struct flb_time now; + const struct flb_time *ref; + struct tm tm_utc; + time_t seconds; + size_t path_len; + size_t empty_threshold; + size_t buf_size; + size_t out_len; + char *buf; + char *tmp_buf; + flb_sds_t result; + + if (!path) { + return NULL; + } + + if (timestamp) { + ref = timestamp; + } + else { + flb_time_get(&now); + ref = &now; + } + + seconds = ref->tm.tv_sec; + if (!gmtime_r(&seconds, &tm_utc)) { + flb_sds_destroy(path); + return NULL; + } + + path_len = flb_sds_len(path); + empty_threshold = path_len > 0 ? path_len * 2 : 2; + buf_size = path_len + 64; + buf = flb_malloc(buf_size + 1); + if (!buf) { + flb_errno(); + flb_sds_destroy(path); + return NULL; + } + + buf[0] = '\0'; + + while (1) { + out_len = strftime(buf, buf_size + 1, path, &tm_utc); + if (out_len > 0) { + break; + } + + if (buf_size > empty_threshold) { + break; + } + + if (buf_size > 4096) { + break; + } + + buf_size *= 2; + tmp_buf = flb_realloc(buf, buf_size + 1); + if (!tmp_buf) { + flb_errno(); + flb_free(buf); + flb_sds_destroy(path); + return NULL; + } + buf = tmp_buf; + } + + if (out_len == 0) { + if (ctx && ctx->ins) { + flb_plg_error(ctx->ins, + "[azure_blob] invalid or too-long strftime path template"); + } + else { + flb_error("[azure_blob] invalid or too-long strftime path template"); + } + flb_free(buf); + flb_sds_destroy(path); + return NULL; + } + + result = flb_sds_create_len(buf, out_len); + if (!result) { + flb_errno(); + flb_free(buf); + flb_sds_destroy(path); + return NULL; + } + + flb_free(buf); + flb_sds_destroy(path); + + return result; +} + +/** + * Remove leading and trailing slashes to avoid double separators in URIs. + */ +static void azb_trim_slashes(flb_sds_t path) +{ + size_t len; + size_t start = 0; + char *buf; + + if (!path) { + return; + } + + buf = path; + len = flb_sds_len(path); + + while (start < len && buf[start] == '/') { + start++; + } + + if (start > 0) { + memmove(buf, buf + start, len - start + 1); + len -= start; + flb_sds_len_set(path, len); + } + + while (len > 0 && buf[len - 1] == '/') { + len--; + } + buf[len] = '\0'; + flb_sds_len_set(path, len); +} + +/** + * Build the final blob path by applying record accessors and time templating. + */ +int azb_resolve_path(struct flb_azure_blob *ctx, + const char *tag, + int tag_len, + const struct flb_time *timestamp, + flb_sds_t *out_path) +{ + flb_sds_t path; + struct flb_time now; + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked result; + msgpack_object root; + struct flb_record_accessor *temp_ra; + flb_sds_t expanded; + + if (!out_path) { + return -1; + } + + *out_path = NULL; + + if (!ctx->path_templating_enabled) { + return 0; + } + + if (!ctx->path) { + /* No template to apply; behave as if templating was disabled */ + return 0; + } + + if (!timestamp) { + flb_time_get(&now); + timestamp = &now; + } + + /* Start with the original path template */ + path = flb_sds_create_len(ctx->path, flb_sds_len(ctx->path)); + if (!path) { + flb_errno(); + return -1; + } + + /* Apply UUID replacement before record accessor step. + * Unknown $ tokens get stripped otherwise. + */ + path = azb_replace_uuid(path); + if (!path) { + return -1; + } + + /* Apply time tokens (%3N, %9N, %L) */ + path = azb_apply_time_tokens(path, timestamp); + if (!path) { + return -1; + } + + /* Apply strftime */ + path = azb_apply_strftime(ctx, path, timestamp); + if (!path) { + return -1; + } + + /* Now use record accessor to expand $TAG and $TAG[n] */ + /* Create empty msgpack map for record accessor */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&pk, 0); + + /* Unpack to get msgpack_object */ + msgpack_unpacked_init(&result); + if (msgpack_unpack_next(&result, + sbuf.data, + sbuf.size, + NULL) != MSGPACK_UNPACK_SUCCESS) { + msgpack_sbuffer_destroy(&sbuf); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(path); + return -1; + } + root = result.data; + + /* Create a temporary record accessor for the partially-processed path */ + temp_ra = flb_ra_create(path, FLB_TRUE); + if (!temp_ra) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + flb_sds_destroy(path); + return -1; + } + + /* Use record accessor to expand $TAG and $TAG[n] */ + { + const char *ra_tag = tag; + int ra_tag_len = tag_len; + + if (!ra_tag) { + ra_tag = ""; + ra_tag_len = 0; + } + + expanded = flb_ra_translate(temp_ra, (char *) ra_tag, ra_tag_len, root, NULL); + } + + flb_ra_destroy(temp_ra); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + flb_sds_destroy(path); + + if (!expanded) { + return -1; + } + + azb_trim_slashes(expanded); + + if (flb_sds_len(expanded) == 0) { + *out_path = expanded; + return 0; + } + + *out_path = expanded; + return 0; +} + +static int create_blob(struct flb_azure_blob *ctx, const char *path_prefix, char *name) { int ret; + int status = FLB_OK; size_t b_sent; flb_sds_t uri = NULL; struct flb_http_client *c; struct flb_connection *u_conn; - uri = azb_uri_create_blob(ctx, name); + uri = azb_uri_create_blob(ctx, path_prefix, name); if (!uri) { return FLB_RETRY; } @@ -212,13 +709,11 @@ static int create_blob(struct flb_azure_blob *ctx, char *name) /* Send HTTP request */ ret = flb_http_do(c, &b_sent); - flb_sds_destroy(uri); if (ret == -1) { flb_plg_error(ctx->ins, "error sending append_blob"); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - return FLB_RETRY; + status = FLB_RETRY; + goto cleanup_create; } if (c->resp.status == 201) { @@ -238,25 +733,31 @@ static int create_blob(struct flb_azure_blob *ctx, char *name) flb_plg_error(ctx->ins, "http_status=%i cannot create append blob", c->resp.status); } - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - return FLB_RETRY; + status = FLB_RETRY; + goto cleanup_create; } +cleanup_create: flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); - return FLB_OK; + if (uri) { + flb_sds_destroy(uri); + } + return status; } -static int delete_blob(struct flb_azure_blob *ctx, char *name) +static int delete_blob(struct flb_azure_blob *ctx, + const char *path_prefix, + char *name) { int ret; + int status = FLB_OK; size_t b_sent; flb_sds_t uri = NULL; struct flb_http_client *c; struct flb_connection *u_conn; - uri = azb_uri_create_blob(ctx, name); + uri = azb_uri_create_blob(ctx, path_prefix, name); if (!uri) { return FLB_RETRY; } @@ -287,13 +788,11 @@ static int delete_blob(struct flb_azure_blob *ctx, char *name) /* Send HTTP request */ ret = flb_http_do(c, &b_sent); - flb_sds_destroy(uri); if (ret == -1) { flb_plg_error(ctx->ins, "error sending append_blob"); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - return FLB_RETRY; + status = FLB_RETRY; + goto cleanup_delete; } if (c->resp.status == 201) { @@ -313,14 +812,17 @@ static int delete_blob(struct flb_azure_blob *ctx, char *name) flb_plg_error(ctx->ins, "http_status=%i cannot delete append blob", c->resp.status); } - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - return FLB_RETRY; + status = FLB_RETRY; + goto cleanup_delete; } +cleanup_delete: flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); - return FLB_OK; + if (uri) { + flb_sds_destroy(uri); + } + return status; } static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, @@ -453,53 +955,85 @@ static int send_blob(struct flb_config *config, struct flb_azure_blob *ctx, int event_type, int blob_type, char *name, uint64_t part_id, - char *tag, int tag_len, void *data, size_t bytes) + char *tag, int tag_len, const char *resolved_path_prefix, + void *data, size_t bytes) { int ret; uint64_t ms = 0; flb_sds_t uri = NULL; flb_sds_t block_id = NULL; flb_sds_t ref_name = NULL; - void *payload_buf = data; - size_t payload_size = bytes; - char *generated_random_string; + flb_sds_t tmp_path_prefix = NULL; + const char *path_prefix = resolved_path_prefix; + char *generated_random_string = NULL; + struct flb_time now; - ref_name = flb_sds_create_size(256); - if (!ref_name) { - return FLB_RETRY; + flb_time_get(&now); + + if (!path_prefix) { + if (azb_resolve_path(ctx, tag, tag_len, &now, &tmp_path_prefix) != 0) { + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } + return FLB_RETRY; + } + path_prefix = tmp_path_prefix; } - /* Allocate memory for the random string dynamically */ - generated_random_string = flb_malloc(ctx->blob_uri_length + 1); - if (!generated_random_string) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot allocate memory for random string"); - flb_sds_destroy(ref_name); + ref_name = flb_sds_create_size(256); + if (!ref_name) { + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } return FLB_RETRY; } if (blob_type == AZURE_BLOB_APPENDBLOB) { - uri = azb_append_blob_uri(ctx, tag); + uri = azb_append_blob_uri(ctx, path_prefix, tag); } else if (blob_type == AZURE_BLOB_BLOCKBLOB) { + generated_random_string = flb_malloc(ctx->blob_uri_length + 1); + if (!generated_random_string) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate memory for random string"); + flb_sds_destroy(ref_name); + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } + return FLB_RETRY; + } generate_random_string_blob(generated_random_string, ctx->blob_uri_length); /* Generate the random string */ if (event_type == FLB_EVENT_TYPE_LOGS) { block_id = azb_block_blob_id_logs(&ms); if (!block_id) { flb_plg_error(ctx->ins, "could not generate block id"); flb_free(generated_random_string); - cfl_sds_destroy(ref_name); + generated_random_string = NULL; + flb_sds_destroy(ref_name); + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } return FLB_RETRY; } - uri = azb_block_blob_uri(ctx, tag, block_id, ms, generated_random_string); + uri = azb_block_blob_uri(ctx, path_prefix, tag, + block_id, ms, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms); } else if (event_type == FLB_EVENT_TYPE_BLOBS) { block_id = azb_block_blob_id_blob(ctx, name, part_id); - uri = azb_block_blob_uri(ctx, name, block_id, 0, generated_random_string); + uri = azb_block_blob_uri(ctx, path_prefix, name, + block_id, 0, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id); } } + else { + flb_plg_error(ctx->ins, "unsupported blob type %d", blob_type); + flb_sds_destroy(ref_name); + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } + return FLB_RETRY; + } if (!uri) { flb_free(generated_random_string); @@ -507,41 +1041,50 @@ static int send_blob(struct flb_config *config, flb_free(block_id); } flb_sds_destroy(ref_name); + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } return FLB_RETRY; } - /* Map buffer */ - payload_buf = data; - payload_size = bytes; - - ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size); + ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, data, bytes); flb_plg_debug(ctx->ins, "http_send_blob()=%i", ret); if (ret == FLB_OK) { /* For Logs type, we need to commit the block right away */ - if (event_type == FLB_EVENT_TYPE_LOGS) { - ret = azb_block_blob_commit_block(ctx, block_id, tag, ms, generated_random_string); + if (blob_type == AZURE_BLOB_BLOCKBLOB && + event_type == FLB_EVENT_TYPE_LOGS) { + ret = azb_block_blob_commit_block(ctx, path_prefix, block_id, + tag, ms, generated_random_string); } } else if (ret == CREATE_BLOB) { - ret = create_blob(ctx, name); + ret = create_blob(ctx, path_prefix, name); if (ret == FLB_OK) { - ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size); + ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, data, bytes); + if (ret == FLB_OK && + blob_type == AZURE_BLOB_BLOCKBLOB && + event_type == FLB_EVENT_TYPE_LOGS) { + ret = azb_block_blob_commit_block(ctx, path_prefix, block_id, + tag, ms, generated_random_string); + } } } flb_sds_destroy(ref_name); - if (payload_buf != data) { - flb_sds_destroy(payload_buf); - } - flb_sds_destroy(uri); - flb_free(generated_random_string); + if (generated_random_string) { + flb_free(generated_random_string); + } if (block_id != NULL) { flb_free(block_id); } + if (tmp_path_prefix) { + flb_sds_destroy(tmp_path_prefix); + } + return ret; } @@ -839,15 +1382,46 @@ static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk continue; } - ret = azb_db_file_insert(ctx, source, ctx->real_endpoint, file_path, file_size); + flb_sds_t path_prefix = NULL; + + int tag_len = 0; + + if (event_chunk->tag) { + tag_len = flb_sds_len(event_chunk->tag); + } + + ret = azb_resolve_path(ctx, + event_chunk->tag, + tag_len, + &log_event.timestamp, + &path_prefix); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot resolve blob path prefix, skipping file %s", + file_path); + if (path_prefix) { + flb_sds_destroy(path_prefix); + } + cfl_sds_destroy(file_path); + cfl_sds_destroy(source); + continue; + } + + ret = azb_db_file_insert(ctx, source, ctx->real_endpoint, file_path, + path_prefix, file_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)", file_path, file_size); cfl_sds_destroy(file_path); cfl_sds_destroy(source); + if (path_prefix) { + flb_sds_destroy(path_prefix); + } continue; } + if (path_prefix) { + flb_sds_destroy(path_prefix); + } cfl_sds_destroy(file_path); cfl_sds_destroy(source); @@ -884,6 +1458,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_t file_path = NULL; cfl_sds_t part_ids = NULL; cfl_sds_t source = NULL; + cfl_sds_t path_prefix = NULL; + cfl_sds_t part_path_prefix = NULL; + cfl_sds_t stale_path_prefix = NULL; + cfl_sds_t aborted_path_prefix = NULL; struct flb_azure_blob *ctx = out_context; struct worker_info *info; struct flb_blob_delivery_notification *notification; @@ -911,15 +1489,20 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context while (1) { ret = azb_db_file_get_next_stale(ctx, &file_id, - &file_path); + &file_path, + &stale_path_prefix); if (ret == 1) { - delete_blob(ctx, file_path); + delete_blob(ctx, stale_path_prefix, file_path); azb_db_file_reset_upload_states(ctx, file_id, file_path); azb_db_file_set_aborted_state(ctx, file_id, file_path, 0); cfl_sds_destroy(file_path); + if (stale_path_prefix) { + cfl_sds_destroy(stale_path_prefix); + stale_path_prefix = NULL; + } file_path = NULL; } @@ -933,10 +1516,11 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context &file_id, &file_delivery_attempts, &file_path, - &source); + &source, + &aborted_path_prefix); if (ret == 1) { - ret = delete_blob(ctx, file_path); + ret = delete_blob(ctx, aborted_path_prefix, file_path); if (ctx->file_delivery_attempt_limit != FLB_OUT_RETRY_UNLIMITED && file_delivery_attempts < ctx->file_delivery_attempt_limit) { @@ -974,6 +1558,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_destroy(file_path); cfl_sds_destroy(source); + if (aborted_path_prefix) { + cfl_sds_destroy(aborted_path_prefix); + aborted_path_prefix = NULL; + } file_path = NULL; source = NULL; @@ -983,7 +1571,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context } } - ret = azb_db_file_oldest_ready(ctx, &file_id, &file_path, &part_ids, &source); + ret = azb_db_file_oldest_ready(ctx, &file_id, &file_path, &part_ids, &source, + &path_prefix); if (ret == 0) { flb_plg_trace(ctx->ins, "no blob files ready to commit"); } @@ -993,7 +1582,9 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context else if (ret == 1) { /* one file is ready to be committed */ flb_plg_debug(ctx->ins, "blob file '%s' (id=%" PRIu64 ") ready to upload", file_path, file_id); - ret = azb_block_blob_commit_file_parts(ctx, file_id, file_path, part_ids); + const char *commit_prefix = azb_commit_prefix_with_fallback(ctx, path_prefix); + + ret = azb_block_blob_commit_file_parts(ctx, file_id, file_path, part_ids, commit_prefix); if (ret == -1) { flb_plg_error(ctx->ins, "cannot commit blob file parts for file id=%" PRIu64 " path=%s", file_id, file_path); @@ -1047,6 +1638,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context if (source) { cfl_sds_destroy(source); } + if (path_prefix) { + cfl_sds_destroy(path_prefix); + path_prefix = NULL; + } /* check for a next part file and lock it */ ret = azb_db_file_part_get_next(ctx, &id, &file_id, &part_id, @@ -1054,7 +1649,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context &part_delivery_attempts, &file_delivery_attempts, &file_path, - &file_destination); + &file_destination, + &part_path_prefix); if (ret == -1) { flb_plg_error(ctx->ins, "cannot get next blob file part"); info->active_upload = FLB_FALSE; @@ -1087,6 +1683,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_destroy(file_path); cfl_sds_destroy(file_destination); + if (part_path_prefix) { + cfl_sds_destroy(part_path_prefix); + part_path_prefix = NULL; + } flb_sched_timer_cb_coro_return(); } @@ -1107,6 +1707,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_destroy(file_path); cfl_sds_destroy(file_destination); + if (part_path_prefix) { + cfl_sds_destroy(part_path_prefix); + part_path_prefix = NULL; + } flb_sched_timer_cb_coro_return(); } @@ -1116,7 +1720,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id); ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS, - AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size); + AZURE_BLOB_BLOCKBLOB, file_path, part_id, + NULL, 0, part_path_prefix, out_buf, out_size); if (ret == FLB_OK) { ret = azb_db_file_part_uploaded(ctx, id); @@ -1126,6 +1731,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_destroy(file_path); cfl_sds_destroy(file_destination); + if (part_path_prefix) { + cfl_sds_destroy(part_path_prefix); + part_path_prefix = NULL; + } flb_sched_timer_cb_coro_return(); } @@ -1147,6 +1756,10 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context cfl_sds_destroy(file_path); cfl_sds_destroy(file_destination); + if (part_path_prefix) { + cfl_sds_destroy(part_path_prefix); + part_path_prefix = NULL; + } flb_sched_timer_cb_coro_return(); } @@ -1256,16 +1869,7 @@ static void cb_azure_blob_ingest(struct flb_config *config, void *data) { /* Attempt to send blob */ ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype , (char *) tag_sds,0, (char *) tag_sds, - flb_sds_len(tag_sds), payload, flb_sds_len(payload)); - - /* Handle blob creation if necessary */ - if (ret == CREATE_BLOB) { - ret = create_blob(ctx, tag_sds); - if (ret == FLB_OK) { - ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype, (char *) tag_sds, 0, (char *) tag_sds, - flb_sds_len(tag_sds), payload, flb_sds_len(payload)); - } - } + flb_sds_len(tag_sds), NULL, payload, flb_sds_len(payload)); /* Handle blob send failure */ if (ret != FLB_OK) { @@ -1386,14 +1990,8 @@ static int ingest_all_chunks(struct flb_azure_blob *ctx, struct flb_config *conf tag_sds = flb_sds_create(fsf->meta_buf); flb_free(buffer); - ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); - - if (ret == CREATE_BLOB) { - ret = create_blob(ctx, tag_sds); - if (ret == FLB_OK) { - ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); - } - } + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, + flb_sds_len(tag_sds), NULL, payload, flb_sds_len(payload)); if (ret != FLB_OK) { flb_plg_error(ctx->ins, "ingest_all_chunks :: Failed to ingest data to Azure Blob Storage"); @@ -1556,14 +2154,8 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, } /* Upload the file */ - ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); - - if (ret == CREATE_BLOB) { - ret = create_blob(ctx, upload_file->fsf->name); - if (ret == FLB_OK) { - ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); - } - } + ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, + tag_len, NULL, final_payload, final_payload_size); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "uploaded file %s successfully", upload_file->fsf->name); @@ -1642,21 +2234,10 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, (char *) event_chunk->tag, /* use tag as 'name' */ 0, /* part id */ (char *) event_chunk->tag, flb_sds_len(event_chunk->tag), + NULL, json, json_size); - if (ret == CREATE_BLOB) { - ret = create_blob(ctx, event_chunk->tag); - if (ret == FLB_OK) { - ret = send_blob(config, i_ins, ctx, - FLB_EVENT_TYPE_LOGS, - ctx->btype, /* blob type per user configuration */ - (char *) event_chunk->tag, /* use tag as 'name' */ - 0, /* part id */ - (char *) event_chunk->tag, /* use tag as 'name' */ - flb_sds_len(event_chunk->tag), - json, json_size); - } - } + /* send_blob handles CREATE_BLOB internally */ } } else if (event_chunk->type == FLB_EVENT_TYPE_BLOBS) { diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 8699dda54f8..c71ab993a56 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -24,6 +24,7 @@ #include #include #include +#include /* Content-Type */ #define AZURE_BLOB_CT "Content-Type" @@ -62,6 +63,7 @@ struct flb_azure_blob { flb_sds_t shared_key; flb_sds_t endpoint; flb_sds_t path; + int path_templating_enabled; flb_sds_t date_key; flb_sds_t auth_type; flb_sds_t sas_token; @@ -166,4 +168,13 @@ struct flb_azure_blob { struct flb_config *config; }; +int azb_resolve_path(struct flb_azure_blob *ctx, + const char *tag, + int tag_len, + const struct flb_time *timestamp, + flb_sds_t *out_path); + +const char *azb_commit_prefix_with_fallback(struct flb_azure_blob *ctx, + const char *db_prefix); + #endif diff --git a/plugins/out_azure_blob/azure_blob_appendblob.c b/plugins/out_azure_blob/azure_blob_appendblob.c index 110e9eb5d4d..216edf09575 100644 --- a/plugins/out_azure_blob/azure_blob_appendblob.c +++ b/plugins/out_azure_blob/azure_blob_appendblob.c @@ -24,24 +24,38 @@ #include "azure_blob_conf.h" #include "azure_blob_uri.h" -flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag) +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *tag) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s?comp=appendblock", ctx->path, tag); + effective_path = azb_effective_path(ctx, path_prefix); + + if (effective_path && effective_path[0] != '\0') { + if (flb_sds_printf(&uri, "/%s/%s?comp=appendblock", effective_path, tag) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } else { - flb_sds_printf(&uri, "/%s?comp=appendblock", tag); + if (flb_sds_printf(&uri, "/%s?comp=appendblock", tag) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } if (ctx->atype == AZURE_BLOB_AUTH_SAS && ctx->sas_token) { - flb_sds_printf(&uri, "&%s", ctx->sas_token); + if (flb_sds_printf(&uri, "&%s", ctx->sas_token) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } return uri; diff --git a/plugins/out_azure_blob/azure_blob_appendblob.h b/plugins/out_azure_blob/azure_blob_appendblob.h index 5c3f1a7bd4b..12573a038a4 100644 --- a/plugins/out_azure_blob/azure_blob_appendblob.h +++ b/plugins/out_azure_blob/azure_blob_appendblob.h @@ -23,6 +23,8 @@ #include #include "azure_blob.h" -flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag); +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *tag); #endif diff --git a/plugins/out_azure_blob/azure_blob_blockblob.c b/plugins/out_azure_blob/azure_blob_blockblob.c index 860401b2623..905767a64df 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.c +++ b/plugins/out_azure_blob/azure_blob_blockblob.c @@ -31,18 +31,23 @@ #include "azure_blob_uri.h" #include "azure_blob_http.h" -flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name) +flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *name) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { + effective_path = azb_effective_path(ctx, path_prefix); + + if (effective_path && effective_path[0] != '\0') { flb_sds_printf(&uri, "/%s/%s?comp=blocklist", - ctx->path, name); + effective_path, name); } else { flb_sds_printf(&uri, "/%s?comp=blocklist", name); @@ -55,13 +60,18 @@ flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name) return uri; } -flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, - char *blockid, uint64_t ms, char *random_str) +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *name, + const char *blockid, + uint64_t ms, + const char *random_str) { int len; flb_sds_t uri; char *ext; char *encoded_blockid; + const char *effective_path; len = strlen(blockid); encoded_blockid = azb_uri_encode(blockid, len); @@ -82,14 +92,16 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, ext = ""; } - if (ctx->path) { + effective_path = azb_effective_path(ctx, path_prefix); + + if (effective_path && effective_path[0] != '\0') { if (ms > 0) { flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?blockid=%s&comp=block", - ctx->path, name, random_str, ms, ext, encoded_blockid); + effective_path, name, random_str, ms, ext, encoded_blockid); } else { flb_sds_printf(&uri, "/%s/%s.%s%s?blockid=%s&comp=block", - ctx->path, name, random_str, ext, encoded_blockid); + effective_path, name, random_str, ext, encoded_blockid); } } else { @@ -112,10 +124,18 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, } flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, - char *tag, uint64_t ms, char *str) + const char *path_prefix, + const char *tag, + uint64_t ms, + const char *str) { char *ext; flb_sds_t uri; + const char *effective_path; + + if (!ctx || !tag || !str) { + return NULL; + } uri = azb_uri_container(ctx); if (!uri) { @@ -129,9 +149,13 @@ flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, ext = ""; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str, - ms, ext); + effective_path = azb_effective_path(ctx, path_prefix); + + if (effective_path && effective_path[0] != '\0') { + flb_sds_printf(&uri, + "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", + effective_path, tag, str, + ms, ext); } else { flb_sds_printf(&uri, "/%s.%s.%" PRIu64 "%s?comp=blocklist", tag, str, ms, ext); @@ -331,14 +355,19 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb } /* Commit a single block */ -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str) +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *blockid, + const char *tag, + uint64_t ms, + const char *str) { int ret; flb_sds_t uri = NULL; flb_sds_t payload; /* Compose commit URI */ - uri = azb_block_blob_uri_commit(ctx, tag, ms, str); + uri = azb_block_blob_uri_commit(ctx, path_prefix, tag, ms, str); if (!uri) { return FLB_ERROR; } @@ -367,7 +396,9 @@ int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char return ret; } -int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id, cfl_sds_t path, cfl_sds_t part_ids) +int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id, + cfl_sds_t path, cfl_sds_t part_ids, + const char *path_prefix) { int ret; uint64_t id; @@ -419,7 +450,7 @@ int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_i cfl_sds_cat_safe(&payload, "", 12); flb_utils_split_free(list); - uri = azb_block_blob_blocklist_uri(ctx, path); + uri = azb_block_blob_blocklist_uri(ctx, path_prefix, path); if (!uri) { flb_sds_destroy(payload); return -1; diff --git a/plugins/out_azure_blob/azure_blob_blockblob.h b/plugins/out_azure_blob/azure_blob_blockblob.h index 6949aefde51..aa98c3e29e6 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.h +++ b/plugins/out_azure_blob/azure_blob_blockblob.h @@ -23,14 +23,31 @@ #include #include "azure_blob.h" -flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name); -flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, char *blockid, - uint64_t ms, char *random_str); +flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *name); +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *name, + const char *blockid, + uint64_t ms, + const char *random_str); +flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *tag, + uint64_t ms, + const char *str); char *azb_block_blob_id_logs(uint64_t *ms); char *azb_block_blob_id_blob(struct flb_azure_blob *ctx, char *path, uint64_t part_id); -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str); +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *blockid, + const char *tag, + uint64_t ms, + const char *str); int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id, - cfl_sds_t path, cfl_sds_t part_ids); + cfl_sds_t path, cfl_sds_t part_ids, + const char *path_prefix); #endif diff --git a/plugins/out_azure_blob/azure_blob_conf.c b/plugins/out_azure_blob/azure_blob_conf.c index ea883a01852..f64147c1cfa 100644 --- a/plugins/out_azure_blob/azure_blob_conf.c +++ b/plugins/out_azure_blob/azure_blob_conf.c @@ -582,6 +582,22 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in } } + /* Sanitize path and mark templating enabled after remote overrides */ + if (ctx->path) { + size_t path_len; + + path_len = flb_sds_len(ctx->path); + if (path_len > 0 && ctx->path[path_len - 1] == '/') { + ctx->path[path_len - 1] = '\0'; + flb_sds_len_set(ctx->path, path_len - 1); + path_len--; + } + + if (path_len > 0) { + ctx->path_templating_enabled = FLB_TRUE; + } + } + if (!ctx->container_name) { flb_plg_error(ctx->ins, "'container_name' has not been set"); return NULL; @@ -755,13 +771,6 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in flb_sds_printf(&ctx->shared_key_prefix, "SharedKey %s:", ctx->account_name); } - /* Sanitize path: remove any ending slash */ - if (ctx->path) { - if (ctx->path[flb_sds_len(ctx->path) - 1] == '/') { - ctx->path[flb_sds_len(ctx->path) - 1] = '\0'; - } - } - /* database file for blob signal handling */ if (ctx->database_file) { ctx->db = azb_db_open(ctx, ctx->database_file); @@ -805,6 +814,7 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx) flb_sds_destroy(ctx->path); ctx->path = NULL; } + ctx->path_templating_enabled = FLB_FALSE; if (ctx->decoded_sk) { flb_free(ctx->decoded_sk); diff --git a/plugins/out_azure_blob/azure_blob_db.c b/plugins/out_azure_blob/azure_blob_db.c index 6951877ae01..64f100e0b80 100644 --- a/plugins/out_azure_blob/azure_blob_db.c +++ b/plugins/out_azure_blob/azure_blob_db.c @@ -25,6 +25,51 @@ #include "azure_blob_db.h" +#include + +static int ensure_path_prefix_column(struct flb_azure_blob *ctx, struct flb_sqldb *db) +{ + int ret; + int found = FLB_FALSE; + const char *sql = "PRAGMA table_info(out_azure_blob_files);"; + sqlite3_stmt *stmt = NULL; + + ret = sqlite3_prepare_v2(db->handler, sql, -1, &stmt, NULL); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "cannot inspect azure blob files schema"); + if (stmt != NULL) { + sqlite3_finalize(stmt); + } + return -1; + } + + while ((ret = sqlite3_step(stmt)) == SQLITE_ROW) { + const unsigned char *name = sqlite3_column_text(stmt, 1); + + if (name && strcmp((const char *) name, "path_prefix") == 0) { + found = FLB_TRUE; + break; + } + } + + sqlite3_finalize(stmt); + + if (found == FLB_TRUE) { + return 0; + } + + ret = sqlite3_exec(db->handler, + "ALTER TABLE out_azure_blob_files ADD COLUMN path_prefix TEXT;", + NULL, NULL, NULL); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "cannot add path_prefix column: %s", + sqlite3_errmsg(db->handler)); + return -1; + } + + return 0; +} + static int prepare_stmts(struct flb_sqldb *db, struct flb_azure_blob *ctx) { int ret; @@ -239,6 +284,12 @@ struct flb_sqldb *azb_db_open(struct flb_azure_blob *ctx, char *db_path) return NULL; } + ret = ensure_path_prefix_column(ctx, db); + if (ret != 0) { + flb_sqldb_close(db); + return NULL; + } + ret = prepare_stmts(db, ctx); if (ret == -1) { flb_sqldb_close(db); @@ -315,6 +366,7 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx, char *source, char *destination, char *path, + char *path_prefix, size_t size) { int ret; @@ -333,6 +385,13 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx, sqlite3_bind_int64(ctx->stmt_insert_file, 4, size); sqlite3_bind_int64(ctx->stmt_insert_file, 5, created); + if (path_prefix) { + sqlite3_bind_text(ctx->stmt_insert_file, 6, path_prefix, -1, 0); + } + else { + sqlite3_bind_null(ctx->stmt_insert_file, 6); + } + /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_file); if (ret != SQLITE_DONE) { @@ -486,7 +545,8 @@ int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, int azb_db_file_get_next_stale(struct flb_azure_blob *ctx, uint64_t *id, - cfl_sds_t *path) + cfl_sds_t *path, + cfl_sds_t *path_prefix) { int ret; char *tmp_path; @@ -507,12 +567,25 @@ int azb_db_file_get_next_stale(struct flb_azure_blob *ctx, /* id: column 0 */ *id = sqlite3_column_int64(ctx->stmt_get_next_stale_file, 0); tmp_path = (char *) sqlite3_column_text(ctx->stmt_get_next_stale_file, 1); + char *tmp_prefix = (char *) sqlite3_column_text(ctx->stmt_get_next_stale_file, 2); *path = cfl_sds_create(tmp_path); if (*path == NULL) { exists = -1; } + + if (exists == FLB_TRUE && tmp_prefix) { + *path_prefix = cfl_sds_create(tmp_prefix); + if (*path_prefix == NULL) { + exists = -1; + cfl_sds_destroy(*path); + *path = NULL; + } + } + else { + *path_prefix = NULL; + } } else if (ret == SQLITE_DONE) { /* all good */ @@ -529,6 +602,7 @@ int azb_db_file_get_next_stale(struct flb_azure_blob *ctx, if (exists == -1) { *id = 0; *path = NULL; + *path_prefix = NULL; } return exists; @@ -538,7 +612,8 @@ int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, uint64_t *id, uint64_t *delivery_attempts, cfl_sds_t *path, - cfl_sds_t *source) + cfl_sds_t *source, + cfl_sds_t *path_prefix) { int ret; char *tmp_source; @@ -557,6 +632,7 @@ int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, *delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_aborted_file, 1); tmp_source = (char *) sqlite3_column_text(ctx->stmt_get_next_aborted_file, 2); tmp_path = (char *) sqlite3_column_text(ctx->stmt_get_next_aborted_file, 3); + char *tmp_prefix = (char *) sqlite3_column_text(ctx->stmt_get_next_aborted_file, 4); *path = cfl_sds_create(tmp_path); @@ -569,6 +645,17 @@ int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, cfl_sds_destroy(*path); exists = -1; } + else if (tmp_prefix) { + *path_prefix = cfl_sds_create(tmp_prefix); + if (*path_prefix == NULL) { + cfl_sds_destroy(*path); + cfl_sds_destroy(*source); + exists = -1; + } + } + else { + *path_prefix = NULL; + } } } else if (ret == SQLITE_DONE) { @@ -587,6 +674,7 @@ int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, *id = 0; *delivery_attempts = 0; *path = NULL; + *path_prefix = NULL; *source = NULL; } @@ -731,19 +819,24 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, cfl_sds_t *file_path, - cfl_sds_t *destination) + cfl_sds_t *destination, + cfl_sds_t *path_prefix) { int ret; char *tmp = NULL; char *tmp_destination = NULL; + char *tmp_prefix = NULL; cfl_sds_t path; cfl_sds_t local_destination; + cfl_sds_t local_prefix = NULL; if (azb_db_lock(ctx) != 0) { return -1; } *file_path = NULL; + *destination = NULL; + *path_prefix = NULL; /* Run the query */ ret = sqlite3_step(ctx->stmt_get_next_file_part); @@ -757,6 +850,7 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, tmp = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 6); *file_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 7); tmp_destination = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 9); + tmp_prefix = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 10); } else if (ret == SQLITE_DONE) { /* no records */ @@ -774,11 +868,17 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, path = cfl_sds_create(tmp); local_destination = cfl_sds_create(tmp_destination); + if (tmp_prefix) { + local_prefix = cfl_sds_create(tmp_prefix); + } + else { + local_prefix = NULL; + } sqlite3_clear_bindings(ctx->stmt_get_next_file_part); sqlite3_reset(ctx->stmt_get_next_file_part); - if (path == NULL || local_destination == NULL) { + if (path == NULL || local_destination == NULL || (tmp_prefix && local_prefix == NULL)) { if (path != NULL) { cfl_sds_destroy(path); } @@ -787,6 +887,10 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, cfl_sds_destroy(local_destination); } + if (local_prefix != NULL) { + cfl_sds_destroy(local_prefix); + } + azb_db_unlock(ctx); return -1; } @@ -796,12 +900,16 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, if (ret == -1) { cfl_sds_destroy(path); cfl_sds_destroy(local_destination); + if (local_prefix != NULL) { + cfl_sds_destroy(local_prefix); + } azb_db_unlock(ctx); return -1; } *file_path = path; *destination = local_destination; + *path_prefix = local_prefix; azb_db_unlock(ctx); @@ -869,7 +977,8 @@ int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx, } int azb_db_file_oldest_ready(struct flb_azure_blob *ctx, - uint64_t *file_id, cfl_sds_t *path, cfl_sds_t *part_ids, cfl_sds_t *source) + uint64_t *file_id, cfl_sds_t *path, cfl_sds_t *part_ids, cfl_sds_t *source, + cfl_sds_t *path_prefix) { int ret; char *tmp = NULL; @@ -906,7 +1015,7 @@ int azb_db_file_oldest_ready(struct flb_azure_blob *ctx, /* source */ tmp = (char *) sqlite3_column_text(ctx->stmt_get_oldest_file_with_parts, 3); *source = cfl_sds_create(tmp); - if (!*part_ids) { + if (!*source) { cfl_sds_destroy(*part_ids); cfl_sds_destroy(*path); sqlite3_clear_bindings(ctx->stmt_get_oldest_file_with_parts); @@ -914,16 +1023,42 @@ int azb_db_file_oldest_ready(struct flb_azure_blob *ctx, azb_db_unlock(ctx); return -1; } + + /* path prefix */ + tmp = (char *) sqlite3_column_text(ctx->stmt_get_oldest_file_with_parts, 4); + if (tmp) { + *path_prefix = cfl_sds_create(tmp); + if (!*path_prefix) { + cfl_sds_destroy(*source); + cfl_sds_destroy(*part_ids); + cfl_sds_destroy(*path); + sqlite3_clear_bindings(ctx->stmt_get_oldest_file_with_parts); + sqlite3_reset(ctx->stmt_get_oldest_file_with_parts); + azb_db_unlock(ctx); + return -1; + } + } + else { + *path_prefix = NULL; + } } else if (ret == SQLITE_DONE) { /* no records */ sqlite3_clear_bindings(ctx->stmt_get_oldest_file_with_parts); sqlite3_reset(ctx->stmt_get_oldest_file_with_parts); azb_db_unlock(ctx); + *path = NULL; + *part_ids = NULL; + *source = NULL; + *path_prefix = NULL; return 0; } else { azb_db_unlock(ctx); + *path = NULL; + *part_ids = NULL; + *source = NULL; + *path_prefix = NULL; return -1; } diff --git a/plugins/out_azure_blob/azure_blob_db.h b/plugins/out_azure_blob/azure_blob_db.h index bbdbc66f1a3..fa6b4ef7ebc 100644 --- a/plugins/out_azure_blob/azure_blob_db.h +++ b/plugins/out_azure_blob/azure_blob_db.h @@ -31,6 +31,7 @@ " source TEXT NOT NULL," \ " destination TEXT NOT NULL," \ " path TEXT NOT NULL," \ + " path_prefix TEXT," \ " size INTEGER," \ " created INTEGER," \ " delivery_attempts INTEGER DEFAULT 0," \ @@ -53,8 +54,8 @@ ");" #define SQL_INSERT_FILE \ - "INSERT INTO out_azure_blob_files (source, destination, path, size, created)" \ - " VALUES (@source, @destination, @path, @size, @created);" + "INSERT INTO out_azure_blob_files (source, destination, path, size, created, path_prefix)" \ + " VALUES (@source, @destination, @path, @size, @created, @path_prefix);" /* DELETE a registered file and all it parts */ #define SQL_DELETE_FILE \ @@ -76,7 +77,7 @@ "SELECT * FROM out_azure_blob_files WHERE path=@path ORDER BY id DESC;" #define SQL_GET_NEXT_ABORTED_FILE \ - "SELECT id, azbf.delivery_attempts, source, path " \ + "SELECT id, azbf.delivery_attempts, source, path, path_prefix " \ " FROM out_azure_blob_files azbf " \ " WHERE aborted = 1 " \ " AND (SELECT COUNT(*) " \ @@ -88,7 +89,7 @@ #define SQL_GET_NEXT_STALE_FILE \ - "SELECT id, path " \ + "SELECT id, path, path_prefix " \ " FROM out_azure_blob_files azbf " \ " WHERE aborted = 0 " \ " AND last_delivery_attempt > 0 " \ @@ -135,7 +136,8 @@ " f.path, " \ " f.delivery_attempts, " \ " f.last_delivery_attempt, " \ - " f.destination " \ + " f.destination, " \ + " f.path_prefix " \ " FROM out_azure_blob_parts p " \ " JOIN out_azure_blob_files f " \ " ON p.file_id = f.id " \ @@ -164,7 +166,7 @@ * this query is used to compose */ #define SQL_GET_OLDEST_FILE_WITH_PARTS_CONCAT \ - "SELECT f.id, f.path, GROUP_CONCAT(p.part_id ORDER BY p.part_id ASC) AS part_ids, f.source " \ + "SELECT f.id, f.path, GROUP_CONCAT(p.part_id ORDER BY p.part_id ASC) AS part_ids, f.source, f.path_prefix " \ "FROM out_azure_blob_files f " \ "JOIN out_azure_blob_parts p ON f.id = p.file_id " \ "WHERE p.uploaded = 1 " \ @@ -181,6 +183,7 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx, char *source, char *destination, char *path, + char *path_prefix, size_t size); int azb_db_file_delete(struct flb_azure_blob *ctx, uint64_t id, char *path); @@ -197,12 +200,14 @@ int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, uint64_t *id, uint64_t *delivery_attempts, cfl_sds_t *path, - cfl_sds_t *source); + cfl_sds_t *source, + cfl_sds_t *path_prefix); int azb_db_file_get_next_stale(struct flb_azure_blob *ctx, uint64_t *id, - cfl_sds_t *path); + cfl_sds_t *path, + cfl_sds_t *path_prefix); int azb_db_file_reset_upload_states(struct flb_azure_blob *ctx, uint64_t id, char *path); @@ -217,7 +222,8 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, cfl_sds_t *file_path, - cfl_sds_t *destination); + cfl_sds_t *destination, + cfl_sds_t *path_prefix); int azb_db_file_part_uploaded(struct flb_azure_blob *ctx, uint64_t id); int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx, uint64_t file_id, @@ -225,5 +231,6 @@ int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx, uint64_t attempts); int azb_db_file_oldest_ready(struct flb_azure_blob *ctx, - uint64_t *file_id, cfl_sds_t *path, cfl_sds_t *part_ids, cfl_sds_t *source); + uint64_t *file_id, cfl_sds_t *path, cfl_sds_t *part_ids, cfl_sds_t *source, + cfl_sds_t *path_prefix); #endif \ No newline at end of file diff --git a/plugins/out_azure_blob/azure_blob_uri.c b/plugins/out_azure_blob/azure_blob_uri.c index 75a643079aa..5d8da9fbb09 100644 --- a/plugins/out_azure_blob/azure_blob_uri.c +++ b/plugins/out_azure_blob/azure_blob_uri.c @@ -113,7 +113,10 @@ flb_sds_t azb_uri_container(struct flb_azure_blob *ctx) return NULL; } - flb_sds_printf(&uri, "%s%s", ctx->base_uri, ctx->container_name); + if (flb_sds_printf(&uri, "%s%s", ctx->base_uri, ctx->container_name) == NULL) { + flb_sds_destroy(uri); + return NULL; + } return uri; } @@ -127,31 +130,73 @@ flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx) } flb_sds_printf(&uri, "?restype=container"); + if (uri == NULL) { + return NULL; + } + if (ctx->atype == AZURE_BLOB_AUTH_SAS && ctx->sas_token) { - flb_sds_printf(&uri, "&%s", ctx->sas_token); + if (flb_sds_printf(&uri, "&%s", ctx->sas_token) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } return uri; } -flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag) +const char *azb_effective_path(struct flb_azure_blob *ctx, + const char *path_prefix) +{ + if (!ctx) { + return path_prefix; + } + + if (ctx->path_templating_enabled == FLB_TRUE) { + if (path_prefix == NULL) { + return ctx->path; + } + return azb_commit_prefix_with_fallback(ctx, path_prefix); + } + + if (path_prefix && path_prefix[0] != '\0') { + return path_prefix; + } + + return ctx->path; +} + +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *tag) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s", ctx->path, tag); + effective_path = azb_effective_path(ctx, path_prefix); + + if (effective_path && effective_path[0] != '\0') { + if (flb_sds_printf(&uri, "/%s/%s", effective_path, tag) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } else { - flb_sds_printf(&uri, "/%s", tag); + if (flb_sds_printf(&uri, "/%s", tag) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } if (ctx->atype == AZURE_BLOB_AUTH_SAS && ctx->sas_token) { - flb_sds_printf(&uri, "?%s", ctx->sas_token); + if (flb_sds_printf(&uri, "?%s", ctx->sas_token) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } return uri; diff --git a/plugins/out_azure_blob/azure_blob_uri.h b/plugins/out_azure_blob/azure_blob_uri.h index 98ccc8f5b35..84fb7ecd5f5 100644 --- a/plugins/out_azure_blob/azure_blob_uri.h +++ b/plugins/out_azure_blob/azure_blob_uri.h @@ -27,7 +27,11 @@ flb_sds_t azb_uri_container(struct flb_azure_blob *ctx); flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx); -flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag); +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, + const char *path_prefix, + const char *tag); +const char *azb_effective_path(struct flb_azure_blob *ctx, + const char *path_prefix); flb_sds_t azb_uri_encode(const char *uri, size_t len); flb_sds_t azb_uri_decode(const char *uri, size_t len); diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 45d769b9c25..03cec4bbbf5 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -59,6 +59,13 @@ set(UNIT_TESTS_FILES storage_dlq.c ) +if(FLB_OUT_AZURE_BLOB) + set(UNIT_TESTS_FILES + ${UNIT_TESTS_FILES} + azure_blob_path.c + ) +endif() + # TLS helpers if(FLB_TLS) set(UNIT_TESTS_FILES @@ -217,6 +224,10 @@ function(prepare_unit_tests TEST_PREFIX SOURCEFILES) target_link_libraries(${source_file_we} flb-aws) endif() + if(FLB_OUT_AZURE_BLOB AND "${source_file_we}" STREQUAL "flb-it-azure_blob_path") + target_link_libraries(${source_file_we} flb-plugin-out_azure_blob) + endif() + if(FLB_STREAM_PROCESSOR) target_link_libraries(${source_file_we} flb-sp) endif() diff --git a/tests/internal/azure_blob_path.c b/tests/internal/azure_blob_path.c new file mode 100644 index 00000000000..d89117ad29b --- /dev/null +++ b/tests/internal/azure_blob_path.c @@ -0,0 +1,495 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include + +#include +#include +#include + +#include "../../plugins/out_azure_blob/azure_blob.h" +#include "../../plugins/out_azure_blob/azure_blob_uri.h" +#include "../../plugins/out_azure_blob/azure_blob_blockblob.h" +#include "flb_tests_internal.h" + +static void ctx_cleanup(struct flb_azure_blob *ctx) +{ + if (ctx->path) { + flb_sds_destroy(ctx->path); + ctx->path = NULL; + } + + if (ctx->base_uri) { + flb_sds_destroy(ctx->base_uri); + ctx->base_uri = NULL; + } + + if (ctx->container_name) { + flb_sds_destroy(ctx->container_name); + ctx->container_name = NULL; + } + + ctx->path_templating_enabled = FLB_FALSE; + + /* Make sure future tests start from a pristine state even if they reuse + * the same context instance. + */ + memset(ctx, 0, sizeof(*ctx)); +} + +static int ctx_init_with_path(struct flb_azure_blob *ctx, + const char *path, + int templated) +{ + memset(ctx, 0, sizeof(*ctx)); + + ctx->base_uri = flb_sds_create("https://acct.blob.core.windows.net/"); + ctx->container_name = flb_sds_create("container"); + ctx->path = flb_sds_create(path); + ctx->path_templating_enabled = templated; + + if (!ctx->base_uri || !ctx->container_name || !ctx->path) { + ctx_cleanup(ctx); + return -1; + } + + return 0; +} + +void test_resolve_path_basic_tag(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + const char *tag = "service.app"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("logs/$TAG"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, "logs/service.app") == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_custom_delimiter(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + const char *tag = "prod.backend"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("stream/$TAG[0]/$TAG[1]/$TAG"); + + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + ctx_cleanup(&ctx); + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, "stream/prod/backend/prod.backend") == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_time_tokens(void) +{ + struct flb_azure_blob ctx; + struct flb_time ts; + flb_sds_t resolved = NULL; + const char *expect = "time/2025/11/17/987/987654321/987654321"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("time/%Y/%m/%d/%3N/%9N/%L"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + flb_time_set(&ts, 1763382896, 987654321); + + ret = azb_resolve_path(&ctx, NULL, 0, &ts, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, expect) == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_uuid_token(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + int ret; + size_t i; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("uuid/$UUID"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, "demo", 4, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + const char *suffix; + + TEST_CHECK(strncmp(resolved, "uuid/", 5) == 0); + + suffix = resolved + 5; + TEST_CHECK(strlen(suffix) == 8); + TEST_CHECK(strstr(resolved, "$UUID") == NULL); + + for (i = 0; i < 8 && suffix[i] != '\0'; i++) { + TEST_CHECK(isalnum((unsigned char) suffix[i]) != 0); + } + + TEST_CHECK(i == 8 && suffix[8] == '\0'); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_multiple_uuid_tokens(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("multi/$UUID/data/$UUID"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, "demo", 4, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + const char *first_start; + const char *second_marker; + const char *second_start; + + first_start = resolved + strlen("multi/"); + second_marker = strstr(first_start, "/data/"); + TEST_CHECK(second_marker != NULL); + + if (second_marker != NULL) { + size_t first_len; + size_t second_len; + + first_len = (size_t)(second_marker - first_start); + TEST_CHECK(first_len == 8); + + second_start = second_marker + strlen("/data/"); + second_len = strlen(second_start); + TEST_CHECK(second_len == 8); + + if (first_len == 8 && second_len == 8) { + TEST_CHECK(strncmp(first_start, second_start, 8) == 0); + } + } + + TEST_CHECK(strstr(resolved, "$UUID") == NULL); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_empty_result(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("$TAG[5]"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, "a.b", 3, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + if (resolved) { + TEST_CHECK(flb_sds_len(resolved) == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_empty_prefix_uri(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + flb_sds_t uri = NULL; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.base_uri = flb_sds_create("https://acct.blob.core.windows.net/"); + ctx.container_name = flb_sds_create("container"); + ctx.path = flb_sds_create("$TAG[5]"); + + TEST_CHECK(ctx.base_uri != NULL); + TEST_CHECK(ctx.container_name != NULL); + TEST_CHECK(ctx.path != NULL); + if (!ctx.base_uri || !ctx.container_name || !ctx.path) { + ctx_cleanup(&ctx); + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + ret = azb_resolve_path(&ctx, "a.b", 3, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + if (!resolved) { + ctx_cleanup(&ctx); + return; + } + + uri = azb_uri_create_blob(&ctx, resolved, "file.log"); + TEST_CHECK(uri != NULL); + + if (uri) { + TEST_CHECK(strcmp(uri, + "https://acct.blob.core.windows.net/container/file.log") == 0); + flb_sds_destroy(uri); + } + + flb_sds_destroy(resolved); + + ctx_cleanup(&ctx); +} + +void test_blocklist_uri_requires_resolved_prefix(void) +{ + struct flb_azure_blob ctx; + struct flb_time ts_first; + struct flb_time ts_second; + flb_sds_t prefix_first = NULL; + flb_sds_t prefix_second = NULL; + flb_sds_t uri_upload = NULL; + flb_sds_t uri_wrong = NULL; + flb_sds_t uri_correct = NULL; + int ret; + const char *tag = "service.app"; + const char *block_id = "Zmx1ZW50LWJsb2NrLWlk"; + const char *random_id = "RANDOMID"; + + ret = ctx_init_with_path(&ctx, "logs/%S/$UUID", FLB_TRUE); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + flb_time_set(&ts_first, 100, 0); + flb_time_set(&ts_second, 200, 0); + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), &ts_first, &prefix_first); + TEST_CHECK(ret == 0 && prefix_first != NULL); + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), &ts_second, &prefix_second); + TEST_CHECK(ret == 0 && prefix_second != NULL); + + if (!(prefix_first && prefix_second)) { + goto cleanup; + } + + TEST_CHECK(strcmp(prefix_first, prefix_second) != 0); + + uri_upload = azb_block_blob_uri(&ctx, prefix_first, "blob.log", + (char *) block_id, 0, (char *) random_id); + TEST_CHECK(uri_upload != NULL); + if (uri_upload) { + TEST_CHECK(strstr(uri_upload, prefix_first) != NULL); + } + + uri_wrong = azb_block_blob_blocklist_uri(&ctx, ctx.path, "blob.log"); + TEST_CHECK(uri_wrong != NULL); + if (uri_wrong) { + TEST_CHECK(strstr(uri_wrong, "%S") != NULL); + } + + uri_correct = azb_block_blob_blocklist_uri(&ctx, prefix_first, "blob.log"); + TEST_CHECK(uri_correct != NULL); + if (uri_correct) { + TEST_CHECK(strstr(uri_correct, prefix_first) != NULL); + } + +cleanup: + if (prefix_first) { + flb_sds_destroy(prefix_first); + } + if (prefix_second) { + flb_sds_destroy(prefix_second); + } + if (uri_upload) { + flb_sds_destroy(uri_upload); + } + if (uri_wrong) { + flb_sds_destroy(uri_wrong); + } + if (uri_correct) { + flb_sds_destroy(uri_correct); + } + + ctx_cleanup(&ctx); +} + +void test_blocklist_uri_legacy_prefix_fallback(void) +{ + struct flb_azure_blob ctx; + flb_sds_t uri; + int ret; + + ret = ctx_init_with_path(&ctx, "static/prefix", FLB_FALSE); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + uri = azb_block_blob_blocklist_uri(&ctx, NULL, "file.log"); + TEST_CHECK(uri != NULL); + if (uri) { + TEST_CHECK(strstr(uri, "static/prefix/file.log") != NULL); + flb_sds_destroy(uri); + } + + ctx_cleanup(&ctx); +} + +void test_commit_prefix_fallback_static_path(void) +{ + struct flb_azure_blob ctx; + const char *prefix; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("static/prefix"); + TEST_CHECK(ctx.path != NULL); + if (ctx.path == NULL) { + return; + } + + ctx.path_templating_enabled = FLB_TRUE; + + prefix = azb_commit_prefix_with_fallback(&ctx, NULL); + TEST_CHECK(prefix == ctx.path); + + ctx_cleanup(&ctx); +} + +void test_uri_create_static_prefix_fallback(void) +{ + struct flb_azure_blob ctx; + flb_sds_t uri; + int ret; + + ret = ctx_init_with_path(&ctx, "static/prefix", FLB_TRUE); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + uri = azb_uri_create_blob(&ctx, NULL, "file.log"); + TEST_CHECK(uri != NULL); + if (uri != NULL) { + TEST_CHECK(strstr(uri, "static/prefix/file.log") != NULL); + flb_sds_destroy(uri); + } + + ctx_cleanup(&ctx); +} + +void test_block_blob_commit_requires_suffix(void) +{ + struct flb_azure_blob ctx; + flb_sds_t uri; + int ret; + + ret = ctx_init_with_path(&ctx, "logs/%Y/%m/%d", FLB_TRUE); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + uri = azb_block_blob_uri_commit(&ctx, NULL, "blob.log", 1234, "RANDOM"); + TEST_CHECK(uri != NULL); + if (uri) { + TEST_CHECK(strstr(uri, "RANDOM") != NULL); + flb_sds_destroy(uri); + } + + uri = azb_block_blob_uri_commit(&ctx, NULL, "blob.log", 1234, NULL); + TEST_CHECK(uri == NULL); + + ctx_cleanup(&ctx); +} + +TEST_LIST = { + {"resolve_path_basic_tag", test_resolve_path_basic_tag}, + {"resolve_path_custom_delimiter", test_resolve_path_custom_delimiter}, + {"resolve_path_time_tokens", test_resolve_path_time_tokens}, + {"resolve_path_uuid_token", test_resolve_path_uuid_token}, + {"resolve_path_multiple_uuid_tokens", test_resolve_path_multiple_uuid_tokens}, + {"resolve_path_empty_result", test_resolve_path_empty_result}, + {"resolve_path_empty_prefix_uri", test_resolve_path_empty_prefix_uri}, + {"blocklist_uri_requires_resolved_prefix", test_blocklist_uri_requires_resolved_prefix}, + {"blocklist_uri_legacy_prefix_fallback", test_blocklist_uri_legacy_prefix_fallback}, + {"commit_prefix_fallback_static_path", test_commit_prefix_fallback_static_path}, + {"uri_create_static_prefix_fallback", test_uri_create_static_prefix_fallback}, + {"block_blob_commit_requires_suffix", test_block_blob_commit_requires_suffix}, + {0} +};