forked from cory/tildefriends
Calculate thread busyness as the current concurrent running threads vs. the max number of threads ever seen running concurrently.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4404 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
parent
358d02d97f
commit
faca2d387b
36
src/ssb.c
36
src/ssb.c
@ -232,8 +232,8 @@ typedef struct _tf_ssb_t
|
||||
|
||||
tf_ssb_debug_close_t debug_close[k_debug_close_connection_count];
|
||||
|
||||
tf_thread_work_time_t* thread_time;
|
||||
int thread_time_count;
|
||||
int32_t thread_busy_count;
|
||||
int32_t thread_busy_max;
|
||||
|
||||
void (*hitch_callback)(const char* name, uint64_t duration, void* user_data);
|
||||
void* hitch_user_data;
|
||||
@ -2377,7 +2377,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
|
||||
uv_mutex_destroy(&ssb->db_readers_lock);
|
||||
uv_mutex_destroy(&ssb->db_writer_lock);
|
||||
tf_free((void*)ssb->db_path);
|
||||
tf_free(ssb->thread_time);
|
||||
tf_free(ssb);
|
||||
}
|
||||
|
||||
@ -3531,37 +3530,24 @@ JSValue tf_ssb_get_disconnection_debug(tf_ssb_t* ssb, JSContext* context)
|
||||
return result;
|
||||
}
|
||||
|
||||
void tf_ssb_record_thread_time(tf_ssb_t* ssb, int64_t thread_id, uint64_t hrtime)
|
||||
void tf_ssb_record_thread_busy(tf_ssb_t* ssb, bool busy)
|
||||
{
|
||||
for (int i = 0; i < ssb->thread_time_count; i++)
|
||||
int32_t busy_value = __atomic_add_fetch(&ssb->thread_busy_count, busy ? 1 : -1, __ATOMIC_RELAXED);
|
||||
int32_t current = ssb->thread_busy_max;
|
||||
while (busy_value > current && !__atomic_compare_exchange_n(&ssb->thread_busy_max, ¤t, busy_value, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
|
||||
{
|
||||
if (ssb->thread_time[i].thread_id == thread_id)
|
||||
{
|
||||
ssb->thread_time[i].hrtime += hrtime;
|
||||
return;
|
||||
current = ssb->thread_busy_max;
|
||||
}
|
||||
}
|
||||
ssb->thread_time = tf_resize_vec(ssb->thread_time, sizeof(tf_thread_work_time_t) * (ssb->thread_time_count + 1));
|
||||
ssb->thread_time[ssb->thread_time_count++] = (tf_thread_work_time_t)
|
||||
{
|
||||
.thread_id = thread_id,
|
||||
.hrtime = hrtime,
|
||||
};
|
||||
}
|
||||
|
||||
uint64_t tf_ssb_get_average_thread_time(tf_ssb_t* ssb)
|
||||
float tf_ssb_get_average_thread_percent(tf_ssb_t* ssb)
|
||||
{
|
||||
if (!ssb)
|
||||
if (!ssb || !ssb->thread_busy_max)
|
||||
{
|
||||
return 0;
|
||||
return 0.0f;
|
||||
}
|
||||
|
||||
uint64_t total = 0;
|
||||
for (int i = 0; i < ssb->thread_time_count; i++)
|
||||
{
|
||||
total += ssb->thread_time[i].hrtime;
|
||||
}
|
||||
return ssb->thread_time_count ? total / ssb->thread_time_count : 0;
|
||||
return 100.0f * ssb->thread_busy_count / ssb->thread_busy_max;
|
||||
}
|
||||
|
||||
void tf_ssb_set_hitch_callback(tf_ssb_t* ssb, void (*callback)(const char* name, uint64_t duration_ns, void* user_data), void* user_data)
|
||||
|
19
src/ssb.db.c
19
src/ssb.db.c
@ -386,7 +386,6 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
|
||||
typedef struct _message_store_t
|
||||
{
|
||||
uv_work_t work;
|
||||
uv_thread_t thread_id;
|
||||
tf_ssb_t* ssb;
|
||||
char id[k_id_base64_len];
|
||||
char signature[512];
|
||||
@ -404,17 +403,13 @@ typedef struct _message_store_t
|
||||
tf_ssb_db_store_message_callback_t* callback;
|
||||
void* user_data;
|
||||
|
||||
uint64_t start_time;
|
||||
uint64_t end_time;
|
||||
|
||||
message_store_t* next;
|
||||
} message_store_t;
|
||||
|
||||
static void _tf_ssb_db_store_message_work(uv_work_t* work)
|
||||
{
|
||||
message_store_t* store = work->data;
|
||||
store->start_time = uv_hrtime();
|
||||
store->thread_id = uv_thread_self();
|
||||
tf_ssb_record_thread_busy(store->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
|
||||
tf_trace_begin(trace, "message_store_work");
|
||||
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->sequence_before_author);
|
||||
@ -424,7 +419,7 @@ static void _tf_ssb_db_store_message_work(uv_work_t* work)
|
||||
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
|
||||
}
|
||||
tf_trace_end(trace);
|
||||
store->end_time = uv_hrtime();
|
||||
tf_ssb_record_thread_busy(store->ssb, false);
|
||||
}
|
||||
|
||||
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
||||
@ -467,7 +462,6 @@ static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
|
||||
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
message_store_t* store = work->data;
|
||||
tf_ssb_record_thread_time(store->ssb, (int64_t)store->thread_id, store->end_time - store->start_time);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
|
||||
tf_trace_begin(trace, "message_store_after_work");
|
||||
if (store->out_stored)
|
||||
@ -643,7 +637,6 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
|
||||
typedef struct _blob_store_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
uv_thread_t thread_id;
|
||||
tf_ssb_t* ssb;
|
||||
const uint8_t* blob;
|
||||
size_t size;
|
||||
@ -651,26 +644,22 @@ typedef struct _blob_store_work_t
|
||||
bool is_new;
|
||||
tf_ssb_db_blob_store_callback_t* callback;
|
||||
void* user_data;
|
||||
uint64_t start_time;
|
||||
uint64_t end_time;
|
||||
} blob_store_work_t;
|
||||
|
||||
static void _tf_ssb_db_blob_store_work(uv_work_t* work)
|
||||
{
|
||||
blob_store_work_t* blob_work = work->data;
|
||||
blob_work->start_time = uv_hrtime();
|
||||
blob_work->thread_id = uv_thread_self();
|
||||
tf_ssb_record_thread_busy(blob_work->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
|
||||
tf_trace_begin(trace, "blob_store_work");
|
||||
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
|
||||
tf_trace_end(trace);
|
||||
blob_work->end_time = uv_hrtime();
|
||||
tf_ssb_record_thread_busy(blob_work->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
blob_store_work_t* blob_work = work->data;
|
||||
tf_ssb_record_thread_time(blob_work->ssb, (int64_t)blob_work->thread_id, blob_work->end_time - blob_work->start_time);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
|
||||
tf_trace_begin(trace, "blob_store_after_work");
|
||||
if (status == 0 && *blob_work->id)
|
||||
|
@ -194,8 +194,8 @@ tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t*
|
||||
|
||||
JSValue tf_ssb_get_disconnection_debug(tf_ssb_t* ssb, JSContext* context);
|
||||
|
||||
void tf_ssb_record_thread_time(tf_ssb_t* ssb, int64_t thread_id, uint64_t hrtime);
|
||||
uint64_t tf_ssb_get_average_thread_time(tf_ssb_t* ssb);
|
||||
void tf_ssb_record_thread_busy(tf_ssb_t* ssb, bool busy);
|
||||
float tf_ssb_get_average_thread_percent(tf_ssb_t* ssb);
|
||||
|
||||
void tf_ssb_set_hitch_callback(tf_ssb_t* ssb, void (*callback)(const char* name, uint64_t duration_ns, void* user_data), void* user_data);
|
||||
|
||||
|
@ -382,9 +382,6 @@ typedef struct _sql_work_t
|
||||
{
|
||||
uv_work_t request;
|
||||
tf_ssb_t* ssb;
|
||||
uv_thread_t thread_id;
|
||||
uint64_t start_time;
|
||||
uint64_t end_time;
|
||||
const char* query;
|
||||
uint8_t* binds;
|
||||
size_t binds_count;
|
||||
@ -406,10 +403,9 @@ static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* d
|
||||
static void _tf_ssb_sqlAsync_work(uv_work_t* work)
|
||||
{
|
||||
sql_work_t* sql_work = work->data;
|
||||
tf_ssb_record_thread_busy(sql_work->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
|
||||
tf_trace_begin(trace, "sql_async_work");
|
||||
sql_work->start_time = uv_hrtime();
|
||||
sql_work->thread_id = uv_thread_self();
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(sql_work->ssb);
|
||||
sqlite3_set_authorizer(db, tf_ssb_sqlite_authorizer, sql_work->ssb);
|
||||
sqlite3_stmt* statement = NULL;
|
||||
@ -507,7 +503,7 @@ static void _tf_ssb_sqlAsync_work(uv_work_t* work)
|
||||
}
|
||||
sqlite3_set_authorizer(db, NULL, NULL);
|
||||
tf_ssb_release_db_reader(sql_work->ssb, db);
|
||||
sql_work->end_time = uv_hrtime();
|
||||
tf_ssb_record_thread_busy(sql_work->ssb, false);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
@ -516,7 +512,6 @@ static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
|
||||
sql_work_t* sql_work = work->data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
|
||||
tf_trace_begin(trace, "sql_async_after_work");
|
||||
tf_ssb_record_thread_time(sql_work->ssb, (int64_t)sql_work->thread_id, sql_work->end_time - sql_work->start_time);
|
||||
JSContext* context = tf_ssb_get_context(sql_work->ssb);
|
||||
uint8_t* p = sql_work->rows;
|
||||
while (p < sql_work->rows + sql_work->rows_count)
|
||||
|
@ -1154,22 +1154,18 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
|
||||
typedef struct _delete_blobs_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
uv_thread_t thread_id;
|
||||
tf_ssb_t* ssb;
|
||||
uint64_t start_time;
|
||||
uint64_t end_time;
|
||||
} delete_blobs_work_t;
|
||||
|
||||
static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
|
||||
{
|
||||
delete_blobs_work_t* delete = work->data;
|
||||
delete->start_time = uv_hrtime();
|
||||
delete->thread_id = uv_thread_self();
|
||||
tf_ssb_t* ssb = delete->ssb;
|
||||
tf_ssb_record_thread_busy(ssb, true);
|
||||
int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1);
|
||||
if (age <= 0)
|
||||
{
|
||||
delete->end_time = uv_hrtime();
|
||||
tf_ssb_record_thread_busy(ssb, false);
|
||||
return;
|
||||
}
|
||||
int64_t start_ns = uv_hrtime();
|
||||
@ -1209,13 +1205,12 @@ static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
|
||||
int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL;
|
||||
tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms);
|
||||
_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000));
|
||||
delete->end_time = uv_hrtime();
|
||||
tf_ssb_record_thread_busy(ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
delete_blobs_work_t* delete = work->data;
|
||||
tf_ssb_record_thread_time(delete->ssb, (int64_t)delete->thread_id, delete->end_time - delete->start_time);
|
||||
tf_ssb_unref(delete->ssb);
|
||||
tf_free(delete);
|
||||
}
|
||||
|
@ -114,7 +114,6 @@ typedef struct _tf_task_t
|
||||
uv_timer_t trace_timer;
|
||||
uint64_t last_hrtime;
|
||||
uint64_t last_idle_time;
|
||||
uint64_t last_thread_time;
|
||||
float idle_percent;
|
||||
float thread_percent;
|
||||
|
||||
@ -1462,12 +1461,10 @@ static void _tf_task_trace_timer(uv_timer_t* timer)
|
||||
tf_task_t* task = timer->data;
|
||||
uint64_t hrtime = uv_hrtime();
|
||||
uint64_t idle_time = uv_metrics_idle_time(&task->_loop);
|
||||
uint64_t thread_time = tf_ssb_get_average_thread_time(task->_ssb);
|
||||
task->idle_percent = (hrtime - task->last_hrtime) ? 100.0f * (idle_time - task->last_idle_time) / (hrtime - task->last_hrtime) : 0.0f;
|
||||
task->thread_percent = (hrtime - task->last_hrtime) ? 100.0f * (thread_time - task->last_thread_time) / (hrtime - task->last_hrtime) : 0.0f;
|
||||
task->thread_percent = tf_ssb_get_average_thread_percent(task->_ssb);
|
||||
task->last_hrtime = hrtime;
|
||||
task->last_idle_time = idle_time;
|
||||
task->last_thread_time = thread_time;
|
||||
const char* k_names[] =
|
||||
{
|
||||
"child_tasks",
|
||||
|
Loading…
x
Reference in New Issue
Block a user