diff --git a/src/ssb.c b/src/ssb.c index 66cfcdc0..af699946 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -173,6 +173,14 @@ typedef struct _tf_thread_work_time_t uint64_t hrtime; } tf_thread_work_time_t; +typedef struct _tf_ssb_timer_t +{ + tf_ssb_t* ssb; + uv_timer_t timer; + void (*callback)(tf_ssb_t* ssb, void* user_data); + void* user_data; +} tf_ssb_timer_t; + typedef struct _tf_ssb_t { bool own_context; @@ -248,6 +256,9 @@ typedef struct _tf_ssb_t uv_thread_t thread_self; bool is_room; char* room_name; + + tf_ssb_timer_t** timers; + int timers_count; } tf_ssb_t; typedef struct _tf_ssb_connection_message_request_t @@ -2350,6 +2361,11 @@ static void _tf_ssb_on_handle_close(uv_handle_t* handle) handle->data = NULL; } +static void _tf_ssb_on_timer_close(uv_handle_t* handle) +{ + tf_free(handle->data); +} + void tf_ssb_destroy(tf_ssb_t* ssb) { tf_ssb_connections_destroy(ssb->connections_tracker); @@ -2390,6 +2406,14 @@ void tf_ssb_destroy(tf_ssb_t* ssb) uv_close((uv_handle_t*)&ssb->settings_timer, _tf_ssb_on_handle_close); } + for (int i = 0; i < ssb->timers_count; i++) + { + uv_close((uv_handle_t*)&ssb->timers[i]->timer, _tf_ssb_on_timer_close); + } + ssb->timers_count = 0; + tf_free(ssb->timers); + ssb->timers = NULL; + while (ssb->broadcast_listener.data || ssb->broadcast_sender.data || ssb->broadcast_timer.data || @@ -3919,3 +3943,37 @@ void tf_ssb_set_verbose(tf_ssb_t* ssb, bool verbose) { ssb->verbose = verbose; } + +static void _tf_ssb_scheduled_timer(uv_timer_t* handle) +{ + tf_ssb_timer_t* timer = handle->data; + timer->callback(timer->ssb, timer->user_data); + for (int i = 0; i < timer->ssb->timers_count; i++) + { + if (timer->ssb->timers[i] == timer) + { + timer->ssb->timers[i] = timer->ssb->timers[--timer->ssb->timers_count]; + break; + } + } + uv_close((uv_handle_t*)handle, _tf_ssb_on_timer_close); +} + +void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data) +{ + ssb->timers = tf_resize_vec(ssb->timers, sizeof(uv_timer_t*) * (ssb->timers_count + 1)); + tf_ssb_timer_t* timer = tf_malloc(sizeof(tf_ssb_timer_t)); + *timer = (tf_ssb_timer_t) + { + .ssb = ssb, + .timer = + { + .data = timer, + }, + .callback = callback, + .user_data = user_data, + }; + ssb->timers[ssb->timers_count++] = timer; + uv_timer_init(ssb->loop, &timer->timer); + uv_timer_start(&timer->timer, _tf_ssb_scheduled_timer, delay_ms, 0); +} diff --git a/src/ssb.h b/src/ssb.h index 963cd01e..4b8d8906 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -225,3 +225,5 @@ bool tf_ssb_is_room(tf_ssb_t* ssb); void tf_ssb_set_is_room(tf_ssb_t* ssb, bool is_room); const char* tf_ssb_get_room_name(tf_ssb_t* ssb); void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name); + +void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 9ff01256..4267c2e9 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1333,18 +1333,11 @@ static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status) { delete_blobs_work_t* delete = work->data; - tf_ssb_unref(delete->ssb); tf_free(delete); } -static void _tf_ssb_rpc_timer_on_close(uv_handle_t* handle) +static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data) { - tf_free(handle); -} - -static void _tf_ssb_rpc_start_delete_timer(uv_timer_t* timer) -{ - tf_ssb_t* ssb = timer->data; delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t)); *work = (delete_blobs_work_t) { .work = { .data = work}, .ssb = ssb }; int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); @@ -1353,17 +1346,12 @@ static void _tf_ssb_rpc_start_delete_timer(uv_timer_t* timer) tf_printf("uv_queue_work: %s\n", uv_strerror(r)); tf_free(work); } - uv_close((uv_handle_t*)timer, _tf_ssb_rpc_timer_on_close); } static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) { tf_printf("will delete more blobs in %d ms\n", delay_ms); - uv_timer_t* timer = tf_malloc(sizeof(uv_timer_t)); - *timer = (uv_timer_t) { .data = ssb }; - uv_timer_init(tf_ssb_get_loop(ssb), timer); - uv_timer_start(timer, _tf_ssb_rpc_start_delete_timer, delay_ms, 0); - tf_ssb_ref(ssb); + tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_callback, NULL); } void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb) diff --git a/src/task.c b/src/task.c index 71c7262a..1447b7c3 100644 --- a/src/task.c +++ b/src/task.c @@ -123,7 +123,8 @@ typedef struct _tf_task_t uv_idle_t idle; uv_prepare_t prepare; - uv_signal_t signal; + uv_signal_t sig_term; + uv_signal_t sig_int; export_record_t** _exports; int _export_count; @@ -1571,8 +1572,9 @@ JSModuleDef* _tf_task_module_loader(JSContext* context, const char* module_name, return module; } -static void _tf_task_sigterm(uv_signal_t* signal, int sig) +static void _tf_task_signal_shutdown(uv_signal_t* signal, int sig) { + tf_printf("Received %s.\n", strsignal(sig)); tf_task_t* task = signal->data; task->_killed = true; if (task->_parent) @@ -1626,10 +1628,14 @@ tf_task_t* tf_task_create() uv_prepare_init(&task->_loop, &task->prepare); uv_unref((uv_handle_t*)&task->prepare); uv_idle_start(&task->idle, _tf_task_run_jobs_idle); - task->signal.data = task; - uv_signal_init(&task->_loop, &task->signal); - uv_signal_start(&task->signal, _tf_task_sigterm, SIGTERM); - uv_unref((uv_handle_t*)&task->signal); + task->sig_term.data = task; + uv_signal_init(&task->_loop, &task->sig_term); + uv_signal_start(&task->sig_term, _tf_task_signal_shutdown, SIGTERM); + uv_unref((uv_handle_t*)&task->sig_term); + task->sig_int.data = task; + uv_signal_init(&task->_loop, &task->sig_int); + uv_signal_start(&task->sig_int, _tf_task_signal_shutdown, SIGINT); + uv_unref((uv_handle_t*)&task->sig_int); return task; } @@ -1857,14 +1863,17 @@ void tf_task_destroy(tf_task_t* task) } uv_close((uv_handle_t*)&task->idle, _tf_task_on_handle_close); uv_close((uv_handle_t*)&task->prepare, _tf_task_on_handle_close); - uv_signal_stop(&task->signal); - uv_close((uv_handle_t*)&task->signal, _tf_task_on_handle_close); + uv_signal_stop(&task->sig_term); + uv_close((uv_handle_t*)&task->sig_term, _tf_task_on_handle_close); + uv_signal_stop(&task->sig_int); + uv_close((uv_handle_t*)&task->sig_int, _tf_task_on_handle_close); while (task->trace_timer.data || task->gc_timer.data || task->idle.data || task->prepare.data || - task->signal.data) + task->sig_term.data || + task->sig_int.data) { uv_run(&task->_loop, UV_RUN_ONCE); }