Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 34 additions & 39 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ void flb_engine_reschedule_retries(struct flb_config *config)
task = mk_list_entry(t_head, struct flb_task, _head);

if (task->users > 0) {
flb_debug("[engine] retry=%p for task %i already scheduled to run, "
"not re-scheduling it.",
retry, task->id);
flb_debug("[engine] task %i already scheduled to run, not re-scheduling it.",
task->id
);

continue;
}
Expand Down Expand Up @@ -581,7 +581,7 @@ static FLB_INLINE int flb_engine_handle_event(flb_pipefd_t fd, int mask,
return 0;
}
else if (config->shutdown_fd == fd) {
flb_utils_pipe_byte_consume(fd);
flb_utils_timer_consume(fd);
return FLB_ENGINE_SHUTDOWN;
}
else if (config->ch_manager[0] == fd) {
Expand Down Expand Up @@ -716,7 +716,6 @@ int flb_engine_start(struct flb_config *config)
struct flb_sched *sched;
struct flb_net_dns dns_ctx;
struct flb_notification *notification;
int exiting = FLB_FALSE;

/* Initialize the networking layer */
flb_net_lib_init();
Expand Down Expand Up @@ -1001,6 +1000,12 @@ int flb_engine_start(struct flb_config *config)
flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) {
if (event->type == FLB_ENGINE_EV_CORE) {
ret = flb_engine_handle_event(event->fd, event->mask, config);

/*
* This block will be called once on engine stop.
* Will reschedule task to 1 sec. retry.
* Also timer with shutdown event will be created.
*/
if (ret == FLB_ENGINE_STOP) {
if (config->grace_count == 0) {
if (config->grace >= 0) {
Expand All @@ -1015,11 +1020,7 @@ int flb_engine_start(struct flb_config *config)
}

/* mark the runtime as the ingestion is not active and that we are in shutting down mode */
config->is_ingestion_active = FLB_FALSE;
config->is_shutting_down = FLB_TRUE;

/* pause all input plugin instances */
flb_input_pause_all(config);
flb_engine_stop_ingestion(config);

/*
* We are preparing to shutdown, we give a graceful time
Expand All @@ -1028,6 +1029,7 @@ int flb_engine_start(struct flb_config *config)
event = &config->event_shutdown;
event->mask = MK_EVENT_EMPTY;
event->status = MK_EVENT_NONE;
event->priority = FLB_ENGINE_PRIORITY_SHUTDOWN;

/*
* Configure a timer of 1 second, on expiration the code will
Expand All @@ -1038,11 +1040,18 @@ int flb_engine_start(struct flb_config *config)
* If no tasks exists, there is no need to wait for the maximum
* grace period.
*/
config->shutdown_fd = mk_event_timeout_create(evl,
1,
0,
event);
event->priority = FLB_ENGINE_PRIORITY_SHUTDOWN;
if (config->shutdown_fd <= 0) {
config->shutdown_fd = mk_event_timeout_create(evl,
1,
0,
event);

if (config->shutdown_fd == -1) {
flb_error("[engine] could not create shutdown timer");
/* fail early so we don't silently skip scheduled shutdown */
return -1;
}
}
}
else if (ret == FLB_ENGINE_SHUTDOWN) {
/* Increase the grace counter */
Expand All @@ -1062,10 +1071,19 @@ int flb_engine_start(struct flb_config *config)
fs_chunks = 0;
tasks = flb_task_running_count(config);
flb_storage_chunk_count(config, &mem_chunks, &fs_chunks);

if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}

if (tasks > 0) {
flb_task_running_print(config);
}

ret = tasks + mem_chunks + fs_chunks;
if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) {
if (config->grace_count == 1) {
flb_task_running_print(config);
/*
* If storage.backlog.shutdown_flush is enabled, attempt to flush pending
* filesystem chunks during shutdown. This is particularly useful in scenarios
Expand All @@ -1079,32 +1097,10 @@ int flb_engine_start(struct flb_config *config)
}
}
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}

/* Create new tasks for pending chunks */
flb_engine_flush(config, NULL);
if (config->grace_count < config->grace_input) {
if (exiting == FLB_FALSE) {
flb_engine_exit(config);
exiting = FLB_TRUE;
}
} else {
if (config->is_ingestion_active == FLB_TRUE) {
flb_engine_stop_ingestion(config);
}
}
}
else {
if (tasks > 0) {
flb_task_running_print(config);
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}
flb_info("[engine] service has stopped (%i pending tasks)",
tasks);
ret = config->exit_status_code;
Expand All @@ -1115,7 +1111,6 @@ int flb_engine_start(struct flb_config *config)
&config->event_shutdown);
}

config = NULL;
return ret;
}
}
Expand Down
Loading