Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE 2048
#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT 16384
#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE 256
#define FLB_CONFIG_EVENT_LOOP_SIZE 8

/* The reason behind FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT being set to 16384
* is that this is largest unsigned number expressable with 14 bits which is
Expand All @@ -52,10 +53,20 @@
* pointers.
*/

#define FLB_CONTEXT_EV_SIGNAL (1 << 0) /* 1 */

#define FLB_CTX_SIGNAL_RELOAD 1
#define FLB_CTX_SIGNAL_SHUTDOWN 2

/* Main struct to hold the configuration of the runtime service */
struct flb_config {
struct mk_event ch_event;

/* external communication channel for fluent-bit contexts */
struct mk_event_loop *ctx_evl;
flb_pipefd_t ch_context_signal[2]; /* channel to recieve context signal events */
struct mk_event event_context_signal;

int support_mode; /* enterprise support mode ? */
int is_ingestion_active; /* date ingestion active/allowed */
int is_shutting_down; /* is the service shutting down ? */
Expand Down Expand Up @@ -313,6 +324,16 @@ struct flb_config {
int dry_run;
};

static inline int flb_config_signal_channel_send(flb_pipefd_t channel, uint64_t signal_type)
{
return flb_pipe_w(channel, &signal_type, sizeof(uint64_t));
}

static inline int flb_config_signal_send(struct flb_config *config, uint64_t signal_type)
{
return flb_pipe_w(config->ch_context_signal[1], &signal_type, sizeof(uint64_t));
}

#define FLB_CONFIG_LOG_LEVEL(c) (c->log->level)

struct flb_config *flb_config_init();
Expand Down
13 changes: 8 additions & 5 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ static int exists_header_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
static void *do_reload(void *data)
{
struct reload_ctx *reload = (struct reload_ctx *)data;
int ret;

if (reload == NULL) {
return NULL;
Expand All @@ -470,11 +471,13 @@ static void *do_reload(void *data)

flb_free(reload);
sleep(5);
#ifndef FLB_SYSTEM_WINDOWS
kill(getpid(), SIGHUP);
#else
GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
#endif

ret = flb_config_signal_send(reload->flb->config, FLB_CTX_SIGNAL_RELOAD);
if (ret != 0) {
flb_error("unable to signal reload");
return NULL;
}

return NULL;
}

Expand Down
35 changes: 35 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,29 @@ int flb_engine_start(struct flb_config *config)
}
config->evl_bktq = evl_bktq;

/*
* Event loop channel to send context signals.
*
*/
/* Create the event loop and set it in the global configuration */
config->ctx_evl = mk_event_loop_create(FLB_CONFIG_EVENT_LOOP_SIZE);
if (!config->ctx_evl) {
fprintf(stderr, "[log] could not create context event loop\n");
return -1;
}

ret = mk_event_channel_create(config->ctx_evl,
&config->ch_context_signal[0],
&config->ch_context_signal[1],
&config->event_context_signal);
if (ret == -1) {
flb_error("[engine] could not create context signal channel");
return -1;
}
/* Signal type to indicate a "context" event */
config->event_context_signal.type = FLB_CONTEXT_EV_SIGNAL;
config->event_context_signal.priority = FLB_ENGINE_PRIORITY_THREAD;

/*
* Event loop channel to ingest flush events from flb_engine_flush()
*
Expand Down Expand Up @@ -1047,8 +1070,11 @@ int flb_engine_start(struct flb_config *config)
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
ret = config->exit_status_code;

flb_config_signal_send(config, FLB_CTX_SIGNAL_SHUTDOWN);
flb_engine_shutdown(config);
config = NULL;

return ret;
}
}
Expand Down Expand Up @@ -1185,6 +1211,15 @@ int flb_engine_shutdown(struct flb_config *config)
flb_hs_destroy(config->http_ctx);
}
#endif
if (config->ctx_evl != NULL) {
mk_event_channel_destroy(config->ctx_evl,
config->ch_context_signal[0],
config->ch_context_signal[1],
&config->event_context_signal);
mk_event_loop_destroy(config->ctx_evl);
config->ctx_evl = NULL;
}

if (config->evl) {
mk_event_channel_destroy(config->evl,
config->ch_self_events[0],
Expand Down
99 changes: 49 additions & 50 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ extern void win32_started(void);

flb_ctx_t *ctx;
struct flb_config *config;
flb_pipefd_t ch_context_signal;
volatile sig_atomic_t exit_signal = 0;
volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE;

Expand Down Expand Up @@ -541,7 +542,7 @@ static void flb_help_plugin(int rc, int format,

static void flb_signal_handler_break_loop(int signal)
{
exit_signal = signal;
flb_config_signal_send(config, FLB_CTX_SIGNAL_SHUTDOWN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't call flb_pipe_w from a signal handler. That's why I said you were working on something similar in PR #10275.

Copy link
Contributor Author

@pwhelan pwhelan May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was an experiment to see what could be done to avoid using atomic variables as signal flags. I obvioulsy crashed head first into the fact that EINTR will interrupt and basically drop the events from the queue before they can be emitted.

I think the only real way around using atomic variables will be to add support for signal events to monkey. I was hoping to avoid it but I at least cannot see another way through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I tagged you in that other PR.

I think what he did with the signal queue is correct but we need to do it using the platform agnostic atomic operation wrapper which we need to move from cmetrics to cfl (I thought that was already done but it seems I was wrong).

I think with that, the signal mask tweak and a bit of elbow grease to ensure that the mechanism is tidy we should be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you able to make any progress on this? Do you need anything from me?

}

static void flb_signal_exit(int signal)
Expand Down Expand Up @@ -603,6 +604,7 @@ static void flb_signal_handler_status_line(struct flb_cf *cf_opts)
static void flb_signal_handler(int signal)
{
struct flb_cf *cf_opts = flb_cf_context_get();

flb_signal_handler_status_line(cf_opts);

switch (signal) {
Expand Down Expand Up @@ -633,12 +635,7 @@ static void flb_signal_handler(int signal)
break;
#ifndef FLB_HAVE_STATIC_CONF
case SIGHUP:
if (flb_bin_restarting == FLB_RELOAD_IDLE) {
flb_bin_restarting = FLB_RELOAD_IN_PROGRESS;
}
else {
flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS);
}
flb_config_signal_channel_send(ch_context_signal, FLB_CTX_SIGNAL_RELOAD);
break;
#endif
#endif
Expand Down Expand Up @@ -674,17 +671,7 @@ static BOOL WINAPI flb_console_handler(DWORD evType)
handler_signal = 2;
break;
case 1 /* CTRL_BREAK_EVENT_1 */:
if (flb_bin_restarting == FLB_RELOAD_IDLE) {
flb_bin_restarting = FLB_RELOAD_IN_PROGRESS;
/* signal the main loop to execute reload. this is necessary since
* all signal handlers in win32 are executed on their own thread.
*/
handler_signal = 1;
flb_bin_restarting = FLB_RELOAD_IDLE;
}
else {
flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS);
}
flb_config_signal_channel_send(ch_context_signal, FLB_CTX_SIGNAL_SHUTDOWN);
break;
}
return 1;
Expand Down Expand Up @@ -1003,6 +990,11 @@ int flb_main(int argc, char **argv)
struct flb_cf *cf_opts;
struct flb_cf_group *group;

struct mk_event *event;
uint64_t ctx_signal;
int is_shutdown = FLB_FALSE;
int is_reload = FLB_FALSE;

prog_name = argv[0];

cf_opts = flb_cf_create();
Expand Down Expand Up @@ -1410,6 +1402,8 @@ int flb_main(int argc, char **argv)
return ret;
}

ch_context_signal = config->ch_context_signal[1];

/* Store the current config format context from command line */
flb_cf_context_set(cf_opts);

Expand All @@ -1425,42 +1419,47 @@ int flb_main(int argc, char **argv)
}
#endif

while (ctx->status == FLB_LIB_OK && exit_signal == 0) {
sleep(1);

#ifdef FLB_SYSTEM_WINDOWS
if (handler_signal == 1) {
handler_signal = 0;
flb_reload(ctx, cf_opts);
}
else if (handler_signal == 2){
handler_signal = 0;
break;
}
#endif

/* set the context again before checking the status again */
ctx = flb_context_get();
while (exit_signal == FLB_FALSE && is_shutdown == FLB_FALSE) {
mk_event_wait(config->ch_evl);
mk_event_foreach(event, config->ch_evl) {
if (event->type == FLB_CONTEXT_EV_SIGNAL) {
ret = flb_pipe_r(event->fd, &ctx_signal, sizeof(uint64_t));
if (ret <= 0) {
flb_error("unable to read context eventt");
continue;
}

#ifdef FLB_SYSTEM_WINDOWS
flb_console_handler_set_ctx(ctx, cf_opts);
switch(ctx_signal) {
case FLB_CTX_SIGNAL_SHUTDOWN:
is_shutdown = FLB_TRUE;
break;
case FLB_CTX_SIGNAL_RELOAD:
/* reload by using same config files/path */
flb_bin_restarting == FLB_RELOAD_IN_PROGRESS;
ret = flb_reload(ctx, cf_opts);
if (ret == 0) {
ctx = flb_context_get();
flb_bin_restarting = FLB_RELOAD_IDLE;
config = ctx->config;
ch_context_signal = config->ch_context_signal[1];
#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props);
}
#endif
if (flb_bin_restarting == FLB_RELOAD_IN_PROGRESS) {
/* reload by using same config files/path */
ret = flb_reload(ctx, cf_opts);
if (ret == 0) {
ctx = flb_context_get();
flb_bin_restarting = FLB_RELOAD_IDLE;
}
else {
flb_bin_restarting = ret;
}
else {
flb_bin_restarting = FLB_RELOAD_ABORTED;
}
break;
}
if (exit_signal == FLB_TRUE || is_shutdown == FLB_TRUE || is_reload == FLB_TRUE) {
// reset is_reload flag before re-entering the loop.
is_reload = FLB_FALSE;
break;
}
}
}

if (flb_bin_restarting == FLB_RELOAD_HALTED) {
sleep(1);
flb_bin_restarting = FLB_RELOAD_IDLE;
}
}

if (exit_signal) {
Expand Down
35 changes: 1 addition & 34 deletions src/http_server/api/v2/reload.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "reload", 6);

#ifdef FLB_SYSTEM_WINDOWS
if (config->enable_hot_reload != FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "not enabled", 11);
Expand All @@ -67,37 +66,7 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
http_status = 400;
}
else {
ret = GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
if (ret == 0) {
mk_http_status(request, 500);
mk_http_done(request);
return;
}

msgpack_pack_str(&mp_pck, 4);
msgpack_pack_str_body(&mp_pck, "done", 4);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, ret);
}
#else
if (config->enable_hot_reload != FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "not enabled", 11);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, -1);
}
else if (config->hot_reloading == FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "in progress", 11);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, -2);
http_status = 400;
}
else {
ret = kill(getpid(), SIGHUP);
ret = flb_config_signal_send(config, FLB_CTX_SIGNAL_RELOAD);
if (ret != 0) {
mk_http_status(request, 500);
mk_http_done(request);
Expand All @@ -111,8 +80,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
msgpack_pack_int64(&mp_pck, ret);
}

#endif

/* Export to JSON */
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
Expand Down
Loading