diff --git a/src/ssb.c b/src/ssb.c index 6bcadb7d..486517ec 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -2371,6 +2371,7 @@ void tf_ssb_destroy(tf_ssb_t* ssb) uv_mutex_destroy(&ssb->db_readers_lock); uv_mutex_destroy(&ssb->db_writer_lock); tf_free((void*)ssb->db_path); + tf_free(ssb->thread_time); tf_free(ssb); } diff --git a/src/ssb.db.c b/src/ssb.db.c index c8fd623e..41f9c011 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -385,6 +385,7 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid) typedef struct _message_store_t { uv_work_t work; + uv_thread_t thread_id; tf_ssb_t* ssb; char id[k_id_base64_len]; char signature[512]; @@ -402,12 +403,17 @@ typedef struct _message_store_t tf_ssb_db_store_message_callback_t* callback; void* user_data; + uint64_t start_time; + uint64_t end_time; + message_store_t* next; } message_store_t; static void _tf_ssb_db_store_message_work(uv_work_t* work) { message_store_t* store = work->data; + store->start_time = uv_hrtime(); + store->thread_id = uv_thread_self(); tf_trace_t* trace = tf_ssb_get_trace(store->ssb); tf_trace_begin(trace, "message_store_work"); int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->sequence_before_author); @@ -417,6 +423,7 @@ static void _tf_ssb_db_store_message_work(uv_work_t* work) store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id); } tf_trace_end(trace); + store->end_time = uv_hrtime(); } static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) @@ -459,6 +466,7 @@ static void _tf_ssb_db_store_message_work_finish(message_store_t* store) static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status) { message_store_t* store = work->data; + tf_ssb_record_thread_time(store->ssb, (int64_t)store->thread_id, store->end_time - store->start_time); tf_trace_t* trace = tf_ssb_get_trace(store->ssb); tf_trace_begin(trace, "message_store_after_work"); if (store->out_stored) @@ -634,6 +642,7 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_ typedef struct _blob_store_work_t { uv_work_t work; + uv_thread_t thread_id; tf_ssb_t* ssb; const uint8_t* blob; size_t size; @@ -641,20 +650,26 @@ typedef struct _blob_store_work_t bool is_new; tf_ssb_db_blob_store_callback_t* callback; void* user_data; + uint64_t start_time; + uint64_t end_time; } blob_store_work_t; static void _tf_ssb_db_blob_store_work(uv_work_t* work) { blob_store_work_t* blob_work = work->data; + blob_work->start_time = uv_hrtime(); + blob_work->thread_id = uv_thread_self(); tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); tf_trace_begin(trace, "blob_store_work"); tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); tf_trace_end(trace); + blob_work->end_time = uv_hrtime(); } static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status) { blob_store_work_t* blob_work = work->data; + tf_ssb_record_thread_time(blob_work->ssb, (int64_t)blob_work->thread_id, blob_work->end_time - blob_work->start_time); tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); tf_trace_begin(trace, "blob_store_after_work"); if (status == 0 && *blob_work->id) diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 7533a1ce..d3724d17 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1144,12 +1144,25 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang } } +typedef struct _delete_blobs_work_t +{ + uv_work_t work; + uv_thread_t thread_id; + tf_ssb_t* ssb; + uint64_t start_time; + uint64_t end_time; +} delete_blobs_work_t; + static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) { - tf_ssb_t* ssb = work->data; + delete_blobs_work_t* delete = work->data; + delete->start_time = uv_hrtime(); + delete->thread_id = uv_thread_self(); + tf_ssb_t* ssb = delete->ssb; int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); if (age <= 0) { + delete->end_time = uv_hrtime(); return; } int64_t start_ns = uv_hrtime(); @@ -1189,11 +1202,14 @@ static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL; tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); _tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); + delete->end_time = uv_hrtime(); } static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status) { - tf_free(work); + delete_blobs_work_t* delete = work->data; + tf_ssb_record_thread_time(delete->ssb, (int64_t)delete->thread_id, delete->end_time - delete->start_time); + tf_free(delete); } static void _tf_ssb_rpc_timer_on_close(uv_handle_t* handle) @@ -1204,9 +1220,9 @@ static void _tf_ssb_rpc_timer_on_close(uv_handle_t* handle) static void _tf_ssb_rpc_start_delete_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; - uv_work_t* work = tf_malloc(sizeof(uv_work_t)); - *work = (uv_work_t) { .data = ssb }; - int r = uv_queue_work(tf_ssb_get_loop(ssb), work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); + delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t)); + *work = (delete_blobs_work_t) { .work = { .data = work}, .ssb = ssb }; + int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); if (r) { tf_printf("uv_queue_work: %s\n", uv_strerror(r));