diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index a41c7d83d2d..7ff66eaf2f8 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -41,6 +41,7 @@ #define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE 2048 #define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT 16384 #define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE 256 +#define FLB_CONFIG_EVENT_LOOP_SIZE 8 /* The reason behind FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT being set to 16384 * is that this is largest unsigned number expressable with 14 bits which is @@ -52,10 +53,20 @@ * pointers. */ +#define FLB_CONTEXT_EV_SIGNAL (1 << 0) /* 1 */ + +#define FLB_CTX_SIGNAL_RELOAD 1 +#define FLB_CTX_SIGNAL_SHUTDOWN 2 + /* Main struct to hold the configuration of the runtime service */ struct flb_config { struct mk_event ch_event; + /* external communication channel for fluent-bit contexts */ + struct mk_event_loop *ctx_evl; + flb_pipefd_t ch_context_signal[2]; /* channel to recieve context signal events */ + struct mk_event event_context_signal; + int support_mode; /* enterprise support mode ? */ int is_ingestion_active; /* date ingestion active/allowed */ int is_shutting_down; /* is the service shutting down ? */ @@ -313,6 +324,16 @@ struct flb_config { int dry_run; }; +static inline int flb_config_signal_channel_send(flb_pipefd_t channel, uint64_t signal_type) +{ + return flb_pipe_w(channel, &signal_type, sizeof(uint64_t)); +} + +static inline int flb_config_signal_send(struct flb_config *config, uint64_t signal_type) +{ + return flb_pipe_w(config->ch_context_signal[1], &signal_type, sizeof(uint64_t)); +} + #define FLB_CONFIG_LOG_LEVEL(c) (c->log->level) struct flb_config *flb_config_init(); diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 14033305f04..c6d5be1af8d 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -455,6 +455,7 @@ static int exists_header_fleet_config(struct flb_in_calyptia_fleet_config *ctx) static void *do_reload(void *data) { struct reload_ctx *reload = (struct reload_ctx *)data; + int ret; if (reload == NULL) { return NULL; @@ -470,11 +471,13 @@ static void *do_reload(void *data) flb_free(reload); sleep(5); -#ifndef FLB_SYSTEM_WINDOWS - kill(getpid(), SIGHUP); -#else - GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); -#endif + + ret = flb_config_signal_send(reload->flb->config, FLB_CTX_SIGNAL_RELOAD); + if (ret != 0) { + flb_error("unable to signal reload"); + return NULL; + } + return NULL; } diff --git a/src/flb_engine.c b/src/flb_engine.c index 5e1a8a8a563..5c54135fb23 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -719,6 +719,29 @@ int flb_engine_start(struct flb_config *config) } config->evl_bktq = evl_bktq; + /* + * Event loop channel to send context signals. + * + */ + /* Create the event loop and set it in the global configuration */ + config->ctx_evl = mk_event_loop_create(FLB_CONFIG_EVENT_LOOP_SIZE); + if (!config->ctx_evl) { + fprintf(stderr, "[log] could not create context event loop\n"); + return -1; + } + + ret = mk_event_channel_create(config->ctx_evl, + &config->ch_context_signal[0], + &config->ch_context_signal[1], + &config->event_context_signal); + if (ret == -1) { + flb_error("[engine] could not create context signal channel"); + return -1; + } + /* Signal type to indicate a "context" event */ + config->event_context_signal.type = FLB_CONTEXT_EV_SIGNAL; + config->event_context_signal.priority = FLB_ENGINE_PRIORITY_THREAD; + /* * Event loop channel to ingest flush events from flb_engine_flush() * @@ -1047,8 +1070,11 @@ int flb_engine_start(struct flb_config *config) flb_info("[engine] service has stopped (%i pending tasks)", ret); ret = config->exit_status_code; + + flb_config_signal_send(config, FLB_CTX_SIGNAL_SHUTDOWN); flb_engine_shutdown(config); config = NULL; + return ret; } } @@ -1185,6 +1211,15 @@ int flb_engine_shutdown(struct flb_config *config) flb_hs_destroy(config->http_ctx); } #endif + if (config->ctx_evl != NULL) { + mk_event_channel_destroy(config->ctx_evl, + config->ch_context_signal[0], + config->ch_context_signal[1], + &config->event_context_signal); + mk_event_loop_destroy(config->ctx_evl); + config->ctx_evl = NULL; + } + if (config->evl) { mk_event_channel_destroy(config->evl, config->ch_self_events[0], diff --git a/src/fluent-bit.c b/src/fluent-bit.c index a97ebb13857..b75efb826d7 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -71,6 +71,7 @@ extern void win32_started(void); flb_ctx_t *ctx; struct flb_config *config; +flb_pipefd_t ch_context_signal; volatile sig_atomic_t exit_signal = 0; volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE; @@ -541,7 +542,7 @@ static void flb_help_plugin(int rc, int format, static void flb_signal_handler_break_loop(int signal) { - exit_signal = signal; + flb_config_signal_send(config, FLB_CTX_SIGNAL_SHUTDOWN); } static void flb_signal_exit(int signal) @@ -603,6 +604,7 @@ static void flb_signal_handler_status_line(struct flb_cf *cf_opts) static void flb_signal_handler(int signal) { struct flb_cf *cf_opts = flb_cf_context_get(); + flb_signal_handler_status_line(cf_opts); switch (signal) { @@ -633,12 +635,7 @@ static void flb_signal_handler(int signal) break; #ifndef FLB_HAVE_STATIC_CONF case SIGHUP: - if (flb_bin_restarting == FLB_RELOAD_IDLE) { - flb_bin_restarting = FLB_RELOAD_IN_PROGRESS; - } - else { - flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS); - } + flb_config_signal_channel_send(ch_context_signal, FLB_CTX_SIGNAL_RELOAD); break; #endif #endif @@ -674,17 +671,7 @@ static BOOL WINAPI flb_console_handler(DWORD evType) handler_signal = 2; break; case 1 /* CTRL_BREAK_EVENT_1 */: - if (flb_bin_restarting == FLB_RELOAD_IDLE) { - flb_bin_restarting = FLB_RELOAD_IN_PROGRESS; - /* signal the main loop to execute reload. this is necessary since - * all signal handlers in win32 are executed on their own thread. - */ - handler_signal = 1; - flb_bin_restarting = FLB_RELOAD_IDLE; - } - else { - flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS); - } + flb_config_signal_channel_send(ch_context_signal, FLB_CTX_SIGNAL_SHUTDOWN); break; } return 1; @@ -1003,6 +990,11 @@ int flb_main(int argc, char **argv) struct flb_cf *cf_opts; struct flb_cf_group *group; + struct mk_event *event; + uint64_t ctx_signal; + int is_shutdown = FLB_FALSE; + int is_reload = FLB_FALSE; + prog_name = argv[0]; cf_opts = flb_cf_create(); @@ -1410,6 +1402,8 @@ int flb_main(int argc, char **argv) return ret; } + ch_context_signal = config->ch_context_signal[1]; + /* Store the current config format context from command line */ flb_cf_context_set(cf_opts); @@ -1425,42 +1419,47 @@ int flb_main(int argc, char **argv) } #endif - while (ctx->status == FLB_LIB_OK && exit_signal == 0) { - sleep(1); - -#ifdef FLB_SYSTEM_WINDOWS - if (handler_signal == 1) { - handler_signal = 0; - flb_reload(ctx, cf_opts); - } - else if (handler_signal == 2){ - handler_signal = 0; - break; - } -#endif - - /* set the context again before checking the status again */ - ctx = flb_context_get(); + while (exit_signal == FLB_FALSE && is_shutdown == FLB_FALSE) { + mk_event_wait(config->ch_evl); + mk_event_foreach(event, config->ch_evl) { + if (event->type == FLB_CONTEXT_EV_SIGNAL) { + ret = flb_pipe_r(event->fd, &ctx_signal, sizeof(uint64_t)); + if (ret <= 0) { + flb_error("unable to read context eventt"); + continue; + } -#ifdef FLB_SYSTEM_WINDOWS - flb_console_handler_set_ctx(ctx, cf_opts); + switch(ctx_signal) { + case FLB_CTX_SIGNAL_SHUTDOWN: + is_shutdown = FLB_TRUE; + break; + case FLB_CTX_SIGNAL_RELOAD: + /* reload by using same config files/path */ + flb_bin_restarting == FLB_RELOAD_IN_PROGRESS; + ret = flb_reload(ctx, cf_opts); + if (ret == 0) { + ctx = flb_context_get(); + flb_bin_restarting = FLB_RELOAD_IDLE; + config = ctx->config; + ch_context_signal = config->ch_context_signal[1]; +#ifdef FLB_HAVE_CHUNK_TRACE + if (trace_input != NULL) { + enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props); + } #endif - if (flb_bin_restarting == FLB_RELOAD_IN_PROGRESS) { - /* reload by using same config files/path */ - ret = flb_reload(ctx, cf_opts); - if (ret == 0) { - ctx = flb_context_get(); - flb_bin_restarting = FLB_RELOAD_IDLE; - } - else { - flb_bin_restarting = ret; + } + else { + flb_bin_restarting = FLB_RELOAD_ABORTED; + } + break; + } + if (exit_signal == FLB_TRUE || is_shutdown == FLB_TRUE || is_reload == FLB_TRUE) { + // reset is_reload flag before re-entering the loop. + is_reload = FLB_FALSE; + break; + } } } - - if (flb_bin_restarting == FLB_RELOAD_HALTED) { - sleep(1); - flb_bin_restarting = FLB_RELOAD_IDLE; - } } if (exit_signal) { diff --git a/src/http_server/api/v2/reload.c b/src/http_server/api/v2/reload.c index 2f8c947cbd9..1b8bfdcec7b 100644 --- a/src/http_server/api/v2/reload.c +++ b/src/http_server/api/v2/reload.c @@ -50,7 +50,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf msgpack_pack_str(&mp_pck, 6); msgpack_pack_str_body(&mp_pck, "reload", 6); -#ifdef FLB_SYSTEM_WINDOWS if (config->enable_hot_reload != FLB_TRUE) { msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "not enabled", 11); @@ -67,37 +66,7 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf http_status = 400; } else { - ret = GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); - if (ret == 0) { - mk_http_status(request, 500); - mk_http_done(request); - return; - } - - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "done", 4); - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "status", 6); - msgpack_pack_int64(&mp_pck, ret); - } -#else - if (config->enable_hot_reload != FLB_TRUE) { - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "not enabled", 11); - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "status", 6); - msgpack_pack_int64(&mp_pck, -1); - } - else if (config->hot_reloading == FLB_TRUE) { - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "in progress", 11); - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "status", 6); - msgpack_pack_int64(&mp_pck, -2); - http_status = 400; - } - else { - ret = kill(getpid(), SIGHUP); + ret = flb_config_signal_send(config, FLB_CTX_SIGNAL_RELOAD); if (ret != 0) { mk_http_status(request, 500); mk_http_done(request); @@ -111,8 +80,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf msgpack_pack_int64(&mp_pck, ret); } -#endif - /* Export to JSON */ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); msgpack_sbuffer_destroy(&mp_sbuf);