Compare commits

..

No commits in common. "385524352c4b5b85e40725fc3fb55f25b9b84463" and "c0e72246ccf67b8a4c9700b8fdcf8423d070fd65" have entirely different histories.

9 changed files with 297 additions and 214 deletions

View File

@ -147,17 +147,12 @@ static bool _http_pattern_matches(const char* pattern, const char* path, bool is
if (pattern[i] == '*') if (pattern[i] == '*')
{ {
while (true) for (; path[j]; j++)
{ {
if (_http_pattern_matches(pattern + i + 1, path + j, strchr(pattern + i + 1, '*') != NULL)) if (_http_pattern_matches(pattern + i + 1, path + j, strchr(pattern + i + 1, '*') != NULL))
{ {
return true; return true;
} }
if (!path[j])
{
break;
}
j++;
} }
} }
return !pattern[i] && !path[j]; return !pattern[i] && !path[j];

View File

@ -626,15 +626,11 @@ static const char* _ext_to_content_type(const char* ext)
{ {
if (ext) if (ext)
{ {
if (strcmp(ext, ".html") == 0) if (strcmp(ext, ".js") == 0 || strcmp(ext, ".mjs") == 0)
{
return "text/html; charset=UTF-8";
}
else if (strcmp(ext, ".js") == 0 || strcmp(ext, ".mjs") == 0)
{ {
return "text/javascript; charset=UTF-8"; return "text/javascript; charset=UTF-8";
} }
else if (strcmp(ext, ".css") == 0) if (strcmp(ext, ".css") == 0)
{ {
return "text/css; charset=UTF-8"; return "text/css; charset=UTF-8";
} }
@ -752,11 +748,6 @@ static void _httpd_endpoint_static(tf_http_request_t* request)
is_core = is_core || (after && i == 0); is_core = is_core || (after && i == 0);
} }
if (strcmp(request->path, "/speedscope/") == 0)
{
after = "index.html";
}
if (!after || strstr(after, "..")) if (!after || strstr(after, ".."))
{ {
const char* k_payload = tf_http_status_text(404); const char* k_payload = tf_http_status_text(404);

134
src/ssb.c
View File

@ -765,14 +765,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
} }
else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{ {
if (flags & k_ssb_rpc_flag_binary) tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
{
tf_printf("Dropping message with no active request (%d): (%zd bytes).\n", request_number, size);
}
else
{
tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
}
return; return;
} }
@ -3721,8 +3714,6 @@ typedef struct _connection_work_t
{ {
uv_work_t work; uv_work_t work;
tf_ssb_connection_t* connection; tf_ssb_connection_t* connection;
const char* name;
const char* after_name;
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data); void (*work_callback)(tf_ssb_connection_t* connection, void* user_data);
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data); void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data);
void* user_data; void* user_data;
@ -3734,9 +3725,7 @@ static void _tf_ssb_connection_work_callback(uv_work_t* work)
tf_ssb_record_thread_busy(data->connection->ssb, true); tf_ssb_record_thread_busy(data->connection->ssb, true);
if (data->work_callback) if (data->work_callback)
{ {
tf_trace_begin(data->connection->ssb->trace, data->name);
data->work_callback(data->connection, data->user_data); data->work_callback(data->connection, data->user_data);
tf_trace_end(data->connection->ssb->trace);
} }
tf_ssb_record_thread_busy(data->connection->ssb, false); tf_ssb_record_thread_busy(data->connection->ssb, false);
} }
@ -3746,17 +3735,13 @@ static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status)
connection_work_t* data = work->data; connection_work_t* data = work->data;
if (data->after_work_callback) if (data->after_work_callback)
{ {
tf_trace_begin(data->connection->ssb->trace, data->after_name);
data->after_work_callback(data->connection, status, data->user_data); data->after_work_callback(data->connection, status, data->user_data);
tf_trace_end(data->connection->ssb->trace);
} }
data->connection->ref_count--; data->connection->ref_count--;
if (data->connection->ref_count == 0 && data->connection->closing) if (data->connection->ref_count == 0 && data->connection->closing)
{ {
_tf_ssb_connection_destroy(data->connection, "work completed"); _tf_ssb_connection_destroy(data->connection, "work completed");
} }
tf_free((void*)data->name);
tf_free((void*)data->after_name);
tf_free(data); tf_free(data);
} }
@ -3784,70 +3769,6 @@ void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_cal
} }
} }
typedef struct _ssb_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
const char* name;
const char* after_name;
void (*work_callback)(tf_ssb_t* ssb, void* user_data);
void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data);
void* user_data;
} ssb_work_t;
static void _tf_ssb_work_callback(uv_work_t* work)
{
ssb_work_t* data = work->data;
tf_ssb_record_thread_busy(data->ssb, true);
if (data->work_callback)
{
tf_trace_begin(data->ssb->trace, data->name);
data->work_callback(data->ssb, data->user_data);
tf_trace_end(data->ssb->trace);
}
tf_ssb_record_thread_busy(data->ssb, false);
}
static void _tf_ssb_after_work_callback(uv_work_t* work, int status)
{
ssb_work_t* data = work->data;
if (data->after_work_callback)
{
tf_trace_begin(data->ssb->trace, data->after_name);
data->after_work_callback(data->ssb, status, data->user_data);
tf_trace_end(data->ssb->trace);
}
tf_ssb_unref(data->ssb);
tf_free((void*)data->name);
tf_free((void*)data->after_name);
tf_free(data);
}
void tf_ssb_run_work(tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data)
{
ssb_work_t* work = tf_malloc(sizeof(ssb_work_t));
*work = (ssb_work_t)
{
.work =
{
.data = work,
},
.name = tf_util_function_to_string(work_callback),
.after_name = tf_util_function_to_string(after_work_callback),
.ssb = ssb,
.work_callback = work_callback,
.after_work_callback = after_work_callback,
.user_data = user_data,
};
tf_ssb_ref(ssb);
int result = uv_queue_work(ssb->loop, &work->work, _tf_ssb_work_callback, _tf_ssb_after_work_callback);
if (result)
{
_tf_ssb_connection_after_work_callback(&work->work, result);
}
}
bool tf_ssb_is_room(tf_ssb_t* ssb) bool tf_ssb_is_room(tf_ssb_t* ssb)
{ {
return ssb->is_room; return ssb->is_room;
@ -3871,6 +3792,8 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name)
typedef struct _update_settings_t typedef struct _update_settings_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
bool is_room; bool is_room;
char room_name[1024]; char room_name[1024];
} update_settings_t; } update_settings_t;
@ -3924,35 +3847,58 @@ static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool defau
return result; return result;
} }
static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_update_settings_work(uv_work_t* work)
{ {
update_settings_t* update = user_data; update_settings_t* update = work->data;
update->is_room = _get_global_setting_bool(ssb, "room", true); tf_ssb_record_thread_busy(update->ssb, true);
_get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name)); update->is_room = _get_global_setting_bool(update->ssb, "room", true);
_get_global_setting_string(update->ssb, "room_name", update->room_name, sizeof(update->room_name));
tf_ssb_record_thread_busy(update->ssb, false);
} }
static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* user_data) static void _tf_ssb_update_settings_after_work(uv_work_t* work, int result)
{ {
update_settings_t* update = user_data; update_settings_t* update = work->data;
tf_ssb_set_is_room(ssb, update->is_room); tf_ssb_unref(update->ssb);
tf_ssb_set_room_name(ssb, update->room_name); tf_ssb_set_is_room(update->ssb, update->is_room);
_tf_ssb_start_update_settings(ssb); tf_ssb_set_room_name(update->ssb, update->room_name);
_tf_ssb_start_update_settings(update->ssb);
tf_free(update); tf_free(update);
} }
static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data)
{ {
update_settings_t* update = tf_malloc(sizeof(update_settings_t)); update_settings_t* update = tf_malloc(sizeof(update_settings_t));
*update = (update_settings_t) { 0 }; *update = (update_settings_t)
tf_ssb_run_work(ssb, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work, update); {
.work =
{
.data = update,
},
.ssb = ssb,
};
tf_ssb_ref(ssb);
int result = uv_queue_work(tf_ssb_get_loop(ssb), &update->work, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work);
if (result)
{
_tf_ssb_update_settings_after_work(&update->work, result);
}
} }
static void _tf_ssb_update_settings(tf_ssb_t* ssb) static void _tf_ssb_update_settings(tf_ssb_t* ssb)
{ {
update_settings_t* update = tf_malloc(sizeof(update_settings_t)); update_settings_t* update = tf_malloc(sizeof(update_settings_t));
*update = (update_settings_t) { 0 }; *update = (update_settings_t)
_tf_ssb_update_settings_work(ssb, update); {
_tf_ssb_update_settings_after_work(ssb, 0, update); .work =
{
.data = update,
},
.ssb = ssb,
};
tf_ssb_ref(ssb);
_tf_ssb_update_settings_work(&update->work);
_tf_ssb_update_settings_after_work(&update->work, 0);
} }
static void _tf_ssb_start_update_settings(tf_ssb_t* ssb) static void _tf_ssb_start_update_settings(tf_ssb_t* ssb)

View File

@ -76,30 +76,35 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec
typedef struct _tf_ssb_connections_get_next_t typedef struct _tf_ssb_connections_get_next_t
{ {
uv_work_t work;
tf_ssb_connections_t* connections; tf_ssb_connections_t* connections;
tf_ssb_t* ssb;
bool ready; bool ready;
char host[256]; char host[256];
int port; int port;
char key[k_id_base64_len]; char key[k_id_base64_len];
} tf_ssb_connections_get_next_t; } tf_ssb_connections_get_next_t;
static void _tf_ssb_connections_get_next_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_connections_get_next_work(uv_work_t* work)
{ {
tf_ssb_connections_get_next_t* next = user_data; tf_ssb_connections_get_next_t* next = work->data;
tf_ssb_record_thread_busy(next->ssb, true);
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key)); next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
tf_ssb_record_thread_busy(next->ssb, false);
} }
static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status)
{ {
tf_ssb_connections_get_next_t* next = user_data; tf_ssb_connections_get_next_t* next = work->data;
if (next->ready) if (next->ready)
{ {
uint8_t key_bin[k_id_bin_len]; uint8_t key_bin[k_id_bin_len];
if (tf_ssb_id_str_to_bin(key_bin, next->key)) if (tf_ssb_id_str_to_bin(key_bin, next->key))
{ {
tf_ssb_connect(ssb, next->host, next->port, key_bin); tf_ssb_connect(next->ssb, next->host, next->port, key_bin);
} }
} }
tf_ssb_unref(next->ssb);
tf_free(next); tf_free(next);
} }
@ -111,10 +116,21 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer)
if (count < (int)_countof(active)) if (count < (int)_countof(active))
{ {
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t)); tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
*next = (tf_ssb_connections_get_next_t) { *next = (tf_ssb_connections_get_next_t)
{
.work =
{
.data = next,
},
.ssb = connections->ssb,
.connections = connections, .connections = connections,
}; };
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work, next); tf_ssb_ref(connections->ssb);
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work);
if (result)
{
_tf_ssb_connections_get_next_after_work(&next->work, result);
}
} }
} }
@ -149,6 +165,8 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
typedef struct _tf_ssb_connections_update_t typedef struct _tf_ssb_connections_update_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
char host[256]; char host[256];
int port; int port;
char key[k_id_base64_len]; char key[k_id_base64_len];
@ -156,11 +174,12 @@ typedef struct _tf_ssb_connections_update_t
bool succeeded; bool succeeded;
} tf_ssb_connections_update_t; } tf_ssb_connections_update_t;
static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_connections_update_work(uv_work_t* work)
{ {
tf_ssb_connections_update_t* update = user_data; tf_ssb_connections_update_t* update = work->data;
tf_ssb_record_thread_busy(update->ssb, true);
sqlite3_stmt* statement; sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3* db = tf_ssb_acquire_db_writer(update->ssb);
if (update->attempted) if (update->attempted)
{ {
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK) if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK)
@ -207,23 +226,31 @@ static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data)
sqlite3_finalize(statement); sqlite3_finalize(statement);
} }
} }
tf_ssb_release_db_writer(ssb, db); tf_ssb_release_db_writer(update->ssb, db);
tf_ssb_record_thread_busy(update->ssb, false);
} }
static void _tf_ssb_connections_update_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status)
{ {
tf_free(user_data); tf_ssb_connections_update_t* update = work->data;
tf_free(update);
} }
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update) static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
{ {
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work, update); update->work.data = update;
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work);
if (result)
{
_tf_ssb_connections_update_after_work(&update->work, result);
}
} }
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key) void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
{ {
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) { *update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port, .port = port,
}; };
snprintf(update->host, sizeof(update->host), "%s", host); snprintf(update->host, sizeof(update->host), "%s", host);
@ -235,6 +262,7 @@ void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const c
{ {
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) { *update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port, .port = port,
.attempted = true, .attempted = true,
}; };
@ -247,6 +275,7 @@ void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const c
{ {
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) { *update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port, .port = port,
.succeeded = true, .succeeded = true,
}; };

View File

@ -19,7 +19,8 @@
typedef struct _message_store_t message_store_t; typedef struct _message_store_t message_store_t;
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data); static void _tf_ssb_db_store_message_work_finish(message_store_t* store);
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status);
static void _tf_ssb_db_exec(sqlite3* db, const char* statement) static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
{ {
@ -399,6 +400,8 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
typedef struct _message_store_t typedef struct _message_store_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
char id[k_id_base64_len]; char id[k_id_base64_len];
char signature[512]; char signature[512];
int flags; int flags;
@ -418,16 +421,21 @@ typedef struct _message_store_t
message_store_t* next; message_store_t* next;
} message_store_t; } message_store_t;
static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_db_store_message_work(uv_work_t* work)
{ {
message_store_t* store = user_data; message_store_t* store = work->data;
int64_t last_row_id = _tf_ssb_db_store_message_raw( tf_ssb_record_thread_busy(store->ssb, true);
ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags); tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_work");
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp,
store->content, store->length, store->signature, store->flags);
if (last_row_id != -1) if (last_row_id != -1)
{ {
store->out_stored = true; store->out_stored = true;
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id); store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
} }
tf_trace_end(trace);
tf_ssb_record_thread_busy(store->ssb, false);
} }
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
@ -444,19 +452,38 @@ static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
} }
next->next = NULL; next->next = NULL;
queue->running = true; queue->running = true;
tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next); int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work);
if (r)
{
_tf_ssb_db_store_message_work_finish(next);
}
} }
} }
} }
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
{ {
message_store_t* store = user_data; JSContext* context = tf_ssb_get_context(store->ssb);
tf_trace_t* trace = tf_ssb_get_trace(ssb); if (store->callback)
{
store->callback(store->id, store->out_stored, store->user_data);
}
JS_FreeCString(context, store->content);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb);
queue->running = false;
_wake_up_queue(store->ssb, queue);
tf_free(store);
}
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
{
message_store_t* store = work->data;
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_after_work");
if (store->out_stored) if (store->out_stored)
{ {
tf_trace_begin(trace, "notify_message_added"); tf_trace_begin(trace, "notify_message_added");
JSContext* context = tf_ssb_get_context(ssb); JSContext* context = tf_ssb_get_context(store->ssb);
JSValue formatted = JSValue formatted =
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
JSValue message = JS_NewObject(context); JSValue message = JS_NewObject(context);
@ -465,7 +492,7 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
char timestamp_string[256]; char timestamp_string[256];
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
tf_ssb_notify_message_added(ssb, store->id, message); tf_ssb_notify_message_added(store->ssb, store->id, message);
JS_FreeValue(context, message); JS_FreeValue(context, message);
tf_trace_end(trace); tf_trace_end(trace);
} }
@ -474,22 +501,13 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
tf_trace_begin(trace, "notify_blob_wants_added"); tf_trace_begin(trace, "notify_blob_wants_added");
for (char* p = store->out_blob_wants; *p; p = p + strlen(p)) for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
{ {
tf_ssb_notify_blob_want_added(ssb, p); tf_ssb_notify_blob_want_added(store->ssb, p);
} }
tf_free(store->out_blob_wants); tf_free(store->out_blob_wants);
tf_trace_end(trace); tf_trace_end(trace);
} }
_tf_ssb_db_store_message_work_finish(store);
JSContext* context = tf_ssb_get_context(ssb); tf_trace_end(trace);
if (store->callback)
{
store->callback(store->id, store->out_stored, store->user_data);
}
JS_FreeCString(context, store->content);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
queue->running = false;
_wake_up_queue(ssb, queue);
tf_free(store);
} }
void tf_ssb_db_store_message( void tf_ssb_db_store_message(
@ -521,7 +539,13 @@ void tf_ssb_db_store_message(
JS_FreeValue(context, contentval); JS_FreeValue(context, contentval);
message_store_t* store = tf_malloc(sizeof(message_store_t)); message_store_t* store = tf_malloc(sizeof(message_store_t));
*store = (message_store_t) { *store = (message_store_t)
{
.work =
{
.data = store,
},
.ssb = ssb,
.sequence = sequence, .sequence = sequence,
.timestamp = timestamp, .timestamp = timestamp,
.content = contentstr, .content = contentstr,
@ -636,6 +660,8 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
typedef struct _blob_store_work_t typedef struct _blob_store_work_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
const uint8_t* blob; const uint8_t* blob;
size_t size; size_t size;
char id[k_blob_id_len]; char id[k_blob_id_len];
@ -644,18 +670,25 @@ typedef struct _blob_store_work_t
void* user_data; void* user_data;
} blob_store_work_t; } blob_store_work_t;
static void _tf_ssb_db_blob_store_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_db_blob_store_work(uv_work_t* work)
{ {
blob_store_work_t* blob_work = user_data; blob_store_work_t* blob_work = work->data;
tf_ssb_db_blob_store(ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); tf_ssb_record_thread_busy(blob_work->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_work");
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
tf_trace_end(trace);
tf_ssb_record_thread_busy(blob_work->ssb, false);
} }
static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
{ {
blob_store_work_t* blob_work = user_data; blob_store_work_t* blob_work = work->data;
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_after_work");
if (status == 0 && *blob_work->id) if (status == 0 && *blob_work->id)
{ {
tf_ssb_notify_blob_stored(ssb, blob_work->id); tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id);
} }
if (status != 0) if (status != 0)
{ {
@ -665,19 +698,35 @@ static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* us
{ {
blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data); blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data);
} }
tf_trace_end(trace);
tf_free(blob_work); tf_free(blob_work);
} }
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data) void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data)
{ {
blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t)); blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t));
*work = (blob_store_work_t) { *work = (blob_store_work_t)
{
.work =
{
.data = work,
},
.ssb = ssb,
.blob = blob, .blob = blob,
.size = size, .size = size,
.callback = callback, .callback = callback,
.user_data = user_data, .user_data = user_data,
}; };
tf_ssb_run_work(ssb, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work, work); int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work);
if (r)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r));
if (callback)
{
callback(NULL, false, user_data);
}
tf_free(work);
}
} }
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new) bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
@ -1706,17 +1755,19 @@ bool tf_ssb_db_identity_get_active(sqlite3* db, const char* user, const char* pa
typedef struct _resolve_index_t typedef struct _resolve_index_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
const char* host; const char* host;
const char* path; const char* path;
void (*callback)(const char* path, void* user_data); void (*callback)(const char* path, void* user_data);
void* user_data; void* user_data;
} resolve_index_t; } resolve_index_t;
static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_db_resolve_index_work(uv_work_t* work)
{ {
resolve_index_t* request = user_data; resolve_index_t* request = work->data;
sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
sqlite3_stmt* statement; sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT json_extract(value, '$.index_map') FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) if (sqlite3_prepare(db, "SELECT json_extract(value, '$.index_map') FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK)
{ {
@ -1762,12 +1813,12 @@ static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data)
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
} }
} }
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(request->ssb, db);
} }
static void _tf_ssb_db_resolve_index_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_db_resolve_index_after_work(uv_work_t* work, int status)
{ {
resolve_index_t* request = user_data; resolve_index_t* request = work->data;
request->callback(request->path, request->user_data); request->callback(request->path, request->user_data);
tf_free((void*)request->host); tf_free((void*)request->host);
tf_free((void*)request->path); tf_free((void*)request->path);
@ -1778,9 +1829,15 @@ void tf_ssb_db_resolve_index_async(tf_ssb_t* ssb, const char* host, void (*callb
{ {
resolve_index_t* request = tf_malloc(sizeof(resolve_index_t)); resolve_index_t* request = tf_malloc(sizeof(resolve_index_t));
*request = (resolve_index_t) { *request = (resolve_index_t) {
.work = { .data = request },
.ssb = ssb,
.host = tf_strdup(host), .host = tf_strdup(host),
.callback = callback, .callback = callback,
.user_data = user_data, .user_data = user_data,
}; };
tf_ssb_run_work(ssb, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work, request); int r = uv_queue_work(tf_ssb_get_loop(ssb), &request->work, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work);
if (r)
{
_tf_ssb_db_resolve_index_after_work(&request->work, r);
}
} }

View File

@ -755,16 +755,6 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch
void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data), void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data),
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data); void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data);
/**
** Schedule work to run on a worker thread.
** @param ssb The owning SSB instance.
** @param work_callback The callback to run on a thread.
** @param after_work_callback The callback to run on the main thread when the work is complete.
** @param user_data User data to pass to the callback.
*/
void tf_ssb_run_work(
tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data);
/** /**
** Register for new messages on a connection. ** Register for new messages on a connection.
** @param connection The SHS connection. ** @param connection The SHS connection.

View File

@ -4,6 +4,7 @@
#include "mem.h" #include "mem.h"
#include "ssb.db.h" #include "ssb.db.h"
#include "ssb.h" #include "ssb.h"
#include "trace.h"
#include "util.js.h" #include "util.js.h"
#include "sodium/crypto_box.h" #include "sodium/crypto_box.h"
@ -301,6 +302,8 @@ static JSValue _tf_ssb_getAllIdentities(JSContext* context, JSValueConst this_va
typedef struct _active_identity_work_t typedef struct _active_identity_work_t
{ {
uv_work_t request;
tf_ssb_t* ssb;
JSContext* context; JSContext* context;
const char* name; const char* name;
const char* package_owner; const char* package_owner;
@ -319,22 +322,29 @@ static void _tf_ssb_getActiveIdentity_visit(const char* identity, void* user_dat
} }
} }
static void _tf_ssb_getActiveIdentity_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_getActiveIdentity_work(uv_work_t* work)
{ {
active_identity_work_t* request = user_data; active_identity_work_t* request = work->data;
sqlite3* db = tf_ssb_acquire_db_reader(ssb); tf_ssb_record_thread_busy(request->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(request->ssb);
tf_trace_begin(trace, "_tf_ssb_getActiveIdentity_work");
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
tf_ssb_db_identity_get_active(db, request->name, request->package_owner, request->package_name, request->identity, sizeof(request->identity)); tf_ssb_db_identity_get_active(db, request->name, request->package_owner, request->package_name, request->identity, sizeof(request->identity));
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(request->ssb, db);
if (!*request->identity) if (!*request->identity)
{ {
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getActiveIdentity_visit, request); tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getActiveIdentity_visit, request);
} }
tf_trace_end(trace);
tf_ssb_record_thread_busy(request->ssb, false);
} }
static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status)
{ {
active_identity_work_t* request = user_data; active_identity_work_t* request = work->data;
JSContext* context = request->context; JSContext* context = request->context;
if (request->result == 0) if (request->result == 0)
{ {
@ -360,12 +370,13 @@ static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void
static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{ {
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* name = JS_ToCString(context, argv[0]); const char* name = JS_ToCString(context, argv[0]);
const char* package_owner = JS_ToCString(context, argv[1]); const char* package_owner = JS_ToCString(context, argv[1]);
const char* package_name = JS_ToCString(context, argv[2]); const char* package_name = JS_ToCString(context, argv[2]);
active_identity_work_t* work = tf_malloc(sizeof(active_identity_work_t)); active_identity_work_t* work = tf_malloc(sizeof(active_identity_work_t));
*work = (active_identity_work_t) { *work = (active_identity_work_t) {
.request = { .data = work },
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
.context = context, .context = context,
.name = tf_strdup(name), .name = tf_strdup(name),
.package_owner = tf_strdup(package_owner), .package_owner = tf_strdup(package_owner),
@ -376,12 +387,18 @@ static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_v
JS_FreeCString(context, package_owner); JS_FreeCString(context, package_owner);
JS_FreeCString(context, package_name); JS_FreeCString(context, package_name);
tf_ssb_run_work(ssb, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work, work); int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work);
if (r)
{
_tf_ssb_getActiveIdentity_after_work(&work->request, r);
}
return result; return result;
} }
typedef struct _identity_info_work_t typedef struct _identity_info_work_t
{ {
uv_work_t request;
tf_ssb_t* ssb;
JSContext* context; JSContext* context;
const char* name; const char* name;
const char* package_owner; const char* package_owner;
@ -405,12 +422,12 @@ static void _tf_ssb_getIdentityInfo_visit(const char* identity, void* data)
; ;
} }
static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_getIdentityInfo_work(uv_work_t* work)
{ {
identity_info_work_t* request = user_data; identity_info_work_t* request = work->data;
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getIdentityInfo_visit, request); tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getIdentityInfo_visit, request);
sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
sqlite3_stmt* statement = NULL; sqlite3_stmt* statement = NULL;
request->result = sqlite3_prepare(db, request->result = sqlite3_prepare(db,
"SELECT author, name FROM ( " "SELECT author, name FROM ( "
@ -450,12 +467,12 @@ static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data)
{ {
snprintf(request->active_identity, sizeof(request->active_identity), "%s", request->identities[0]); snprintf(request->active_identity, sizeof(request->active_identity), "%s", request->identities[0]);
} }
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(request->ssb, db);
} }
static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status)
{ {
identity_info_work_t* request = user_data; identity_info_work_t* request = work->data;
JSContext* context = request->context; JSContext* context = request->context;
JSValue result = JS_NewObject(context); JSValue result = JS_NewObject(context);
@ -497,12 +514,13 @@ static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void*
static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{ {
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* name = JS_ToCString(context, argv[0]); const char* name = JS_ToCString(context, argv[0]);
const char* package_owner = JS_ToCString(context, argv[1]); const char* package_owner = JS_ToCString(context, argv[1]);
const char* package_name = JS_ToCString(context, argv[2]); const char* package_name = JS_ToCString(context, argv[2]);
identity_info_work_t* work = tf_malloc(sizeof(identity_info_work_t)); identity_info_work_t* work = tf_malloc(sizeof(identity_info_work_t));
*work = (identity_info_work_t) { *work = (identity_info_work_t) {
.request = { .data = work },
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
.context = context, .context = context,
.name = tf_strdup(name), .name = tf_strdup(name),
.package_owner = tf_strdup(package_owner), .package_owner = tf_strdup(package_owner),
@ -513,7 +531,11 @@ static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val
JS_FreeCString(context, package_owner); JS_FreeCString(context, package_owner);
JS_FreeCString(context, package_name); JS_FreeCString(context, package_name);
tf_ssb_run_work(ssb, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work, work); int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work);
if (r)
{
_tf_ssb_getIdentityInfo_after_work(&work->request, r);
}
return result; return result;
} }
@ -823,6 +845,7 @@ typedef struct _sql_work_t
uint8_t* rows; uint8_t* rows;
size_t binds_count; size_t binds_count;
size_t rows_count; size_t rows_count;
uv_work_t request;
uv_async_t async; uv_async_t async;
uv_timer_t timeout; uv_timer_t timeout;
uv_mutex_t lock; uv_mutex_t lock;
@ -838,10 +861,13 @@ static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* d
*rows_count += size; *rows_count += size;
} }
static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_sqlAsync_work(uv_work_t* work)
{ {
sql_work_t* sql_work = user_data; sql_work_t* sql_work = work->data;
sqlite3* db = tf_ssb_acquire_db_reader_restricted(ssb); tf_ssb_record_thread_busy(sql_work->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
tf_trace_begin(trace, "sql_async_work");
sqlite3* db = tf_ssb_acquire_db_reader_restricted(sql_work->ssb);
uv_mutex_lock(&sql_work->lock); uv_mutex_lock(&sql_work->lock);
sql_work->db = db; sql_work->db = db;
uv_mutex_unlock(&sql_work->lock); uv_mutex_unlock(&sql_work->lock);
@ -949,7 +975,9 @@ static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data)
uv_mutex_lock(&sql_work->lock); uv_mutex_lock(&sql_work->lock);
sql_work->db = NULL; sql_work->db = NULL;
uv_mutex_unlock(&sql_work->lock); uv_mutex_unlock(&sql_work->lock);
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(sql_work->ssb, db);
tf_ssb_record_thread_busy(sql_work->ssb, false);
tf_trace_end(trace);
} }
static void _tf_ssb_sqlAsync_handle_close(uv_handle_t* handle) static void _tf_ssb_sqlAsync_handle_close(uv_handle_t* handle)
@ -975,10 +1003,12 @@ static void _tf_ssb_sqlAsync_destroy(sql_work_t* work)
uv_close((uv_handle_t*)&work->async, _tf_ssb_sqlAsync_handle_close); uv_close((uv_handle_t*)&work->async, _tf_ssb_sqlAsync_handle_close);
} }
static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
{ {
sql_work_t* sql_work = user_data; sql_work_t* sql_work = work->data;
JSContext* context = tf_ssb_get_context(ssb); tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
tf_trace_begin(trace, "sql_async_after_work");
JSContext* context = tf_ssb_get_context(sql_work->ssb);
uint8_t* p = sql_work->rows; uint8_t* p = sql_work->rows;
while (p < sql_work->rows + sql_work->rows_count) while (p < sql_work->rows + sql_work->rows_count)
{ {
@ -1063,6 +1093,7 @@ static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_da
JS_FreeValue(context, sql_work->callback); JS_FreeValue(context, sql_work->callback);
JS_FreeCString(context, sql_work->query); JS_FreeCString(context, sql_work->query);
_tf_ssb_sqlAsync_destroy(sql_work); _tf_ssb_sqlAsync_destroy(sql_work);
tf_trace_end(trace);
} }
static void _tf_ssb_sqlAsync_timeout(uv_timer_t* timer) static void _tf_ssb_sqlAsync_timeout(uv_timer_t* timer)
@ -1089,6 +1120,10 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
sql_work_t* work = tf_malloc(sizeof(sql_work_t)); sql_work_t* work = tf_malloc(sizeof(sql_work_t));
*work = (sql_work_t) *work = (sql_work_t)
{ {
.request =
{
.data = work,
},
.async = .async =
{ {
.data = work, .data = work,
@ -1148,7 +1183,11 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
} }
JS_FreeValue(context, value); JS_FreeValue(context, value);
} }
tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work); int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->request, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work);
if (r)
{
error_value = JS_ThrowInternalError(context, "uv_queue_work failed: %s", uv_strerror(r));
}
} }
if (!JS_IsUndefined(error_value)) if (!JS_IsUndefined(error_value))
{ {
@ -1770,6 +1809,8 @@ static JSValue _tf_ssb_private_message_decrypt(JSContext* context, JSValueConst
typedef struct _following_t typedef struct _following_t
{ {
uv_work_t work;
tf_ssb_t* ssb;
JSContext* context; JSContext* context;
JSValue promise[2]; JSValue promise[2];
@ -1780,15 +1821,17 @@ typedef struct _following_t
const char* ids[]; const char* ids[];
} following_t; } following_t;
static void _tf_ssb_following_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_following_work(uv_work_t* work)
{ {
following_t* following = user_data; following_t* following = work->data;
following->out_following = tf_ssb_db_following_deep(ssb, following->ids, following->ids_count, following->depth); tf_ssb_record_thread_busy(following->ssb, true);
following->out_following = tf_ssb_db_following_deep(following->ssb, following->ids, following->ids_count, following->depth);
tf_ssb_record_thread_busy(following->ssb, false);
} }
static void _tf_ssb_following_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_following_after_work(uv_work_t* work, int status)
{ {
following_t* following = user_data; following_t* following = work->data;
JSContext* context = following->context; JSContext* context = following->context;
if (status == 0) if (status == 0)
{ {
@ -1835,8 +1878,14 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
int ids_count = tf_util_get_length(context, argv[0]); int ids_count = tf_util_get_length(context, argv[0]);
following_t* following = tf_malloc(sizeof(following_t) + sizeof(char*) * ids_count); following_t* following = tf_malloc(sizeof(following_t) + sizeof(char*) * ids_count);
*following = (following_t) { *following = (following_t)
{
.work =
{
.data = following,
},
.context = context, .context = context,
.ssb = ssb,
}; };
JS_ToInt32(context, &following->depth, argv[1]); JS_ToInt32(context, &following->depth, argv[1]);
@ -1854,7 +1903,11 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
} }
} }
tf_ssb_run_work(ssb, _tf_ssb_following_work, _tf_ssb_following_after_work, following); int r = uv_queue_work(tf_ssb_get_loop(ssb), &following->work, _tf_ssb_following_work, _tf_ssb_following_after_work);
if (r)
{
_tf_ssb_following_after_work(&following->work, r);
}
return result; return result;
} }

View File

@ -1137,6 +1137,12 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
} }
} }
typedef struct _delete_blobs_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
} delete_blobs_work_t;
static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
{ {
int64_t checkpoint_start_ms = uv_hrtime(); int64_t checkpoint_start_ms = uv_hrtime();
@ -1154,12 +1160,16 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
tf_ssb_release_db_writer(ssb, db); tf_ssb_release_db_writer(ssb, db);
} }
static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
{ {
delete_blobs_work_t* delete = work->data;
tf_ssb_t* ssb = delete->ssb;
tf_ssb_record_thread_busy(ssb, true);
int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1);
if (age <= 0) if (age <= 0)
{ {
_tf_ssb_rpc_checkpoint(ssb); _tf_ssb_rpc_checkpoint(ssb);
tf_ssb_record_thread_busy(ssb, false);
return; return;
} }
int64_t start_ns = uv_hrtime(); int64_t start_ns = uv_hrtime();
@ -1201,16 +1211,28 @@ static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms);
_tf_ssb_rpc_checkpoint(ssb); _tf_ssb_rpc_checkpoint(ssb);
_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); _tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000));
tf_ssb_record_thread_busy(ssb, false);
} }
static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data) static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status)
{ {
tf_ssb_unref(ssb); delete_blobs_work_t* delete = work->data;
tf_ssb_unref(delete->ssb);
tf_free(delete);
} }
static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data)
{ {
tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL); delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t));
*work = (delete_blobs_work_t) { .work = { .data = work }, .ssb = ssb };
tf_ssb_ref(ssb);
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work);
if (r)
{
tf_printf("uv_queue_work: %s\n", uv_strerror(r));
tf_free(work);
tf_ssb_unref(ssb);
}
} }
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)

View File

@ -272,7 +272,7 @@ static void _tf_trace_begin_tagged(tf_trace_t* trace, const char* name, void* ta
char line[1024]; char line[1024];
int p = snprintf(line, sizeof(line), "{\"ph\": \"B\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)self, _trace_ts()); int p = snprintf(line, sizeof(line), "{\"ph\": \"B\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)self, _trace_ts());
p += _tf_trace_escape_name(line + p, sizeof(line) - p - 4, name); p += _tf_trace_escape_name(line + p, sizeof(line) - p, name);
p += snprintf(line + p, sizeof(line) - p, "\"},"); p += snprintf(line + p, sizeof(line) - p, "\"},");
p = tf_min(p, tf_countof(line)); p = tf_min(p, tf_countof(line));
trace->callback(trace, line, p, trace->user_data); trace->callback(trace, line, p, trace->user_data);
@ -299,7 +299,7 @@ static void _tf_trace_end_tagged(tf_trace_t* trace, void* tag)
char line[1024]; char line[1024];
int p = snprintf(line, sizeof(line), "{\"ph\": \"E\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)pthread_self(), _trace_ts()); int p = snprintf(line, sizeof(line), "{\"ph\": \"E\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)pthread_self(), _trace_ts());
p += _tf_trace_escape_name(line + p, sizeof(line) - p - 4, name); p += _tf_trace_escape_name(line + p, sizeof(line) - p, name);
p += snprintf(line + p, sizeof(line) - p, "\"},"); p += snprintf(line + p, sizeof(line) - p, "\"},");
p = tf_min(p, tf_countof(line)); p = tf_min(p, tf_countof(line));
trace->callback(trace, line, p, trace->user_data); trace->callback(trace, line, p, trace->user_data);