Compare commits
No commits in common. "385524352c4b5b85e40725fc3fb55f25b9b84463" and "c0e72246ccf67b8a4c9700b8fdcf8423d070fd65" have entirely different histories.
385524352c
...
c0e72246cc
@ -147,17 +147,12 @@ static bool _http_pattern_matches(const char* pattern, const char* path, bool is
|
||||
|
||||
if (pattern[i] == '*')
|
||||
{
|
||||
while (true)
|
||||
for (; path[j]; j++)
|
||||
{
|
||||
if (_http_pattern_matches(pattern + i + 1, path + j, strchr(pattern + i + 1, '*') != NULL))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (!path[j])
|
||||
{
|
||||
break;
|
||||
}
|
||||
j++;
|
||||
}
|
||||
}
|
||||
return !pattern[i] && !path[j];
|
||||
|
@ -626,15 +626,11 @@ static const char* _ext_to_content_type(const char* ext)
|
||||
{
|
||||
if (ext)
|
||||
{
|
||||
if (strcmp(ext, ".html") == 0)
|
||||
{
|
||||
return "text/html; charset=UTF-8";
|
||||
}
|
||||
else if (strcmp(ext, ".js") == 0 || strcmp(ext, ".mjs") == 0)
|
||||
if (strcmp(ext, ".js") == 0 || strcmp(ext, ".mjs") == 0)
|
||||
{
|
||||
return "text/javascript; charset=UTF-8";
|
||||
}
|
||||
else if (strcmp(ext, ".css") == 0)
|
||||
if (strcmp(ext, ".css") == 0)
|
||||
{
|
||||
return "text/css; charset=UTF-8";
|
||||
}
|
||||
@ -752,11 +748,6 @@ static void _httpd_endpoint_static(tf_http_request_t* request)
|
||||
is_core = is_core || (after && i == 0);
|
||||
}
|
||||
|
||||
if (strcmp(request->path, "/speedscope/") == 0)
|
||||
{
|
||||
after = "index.html";
|
||||
}
|
||||
|
||||
if (!after || strstr(after, ".."))
|
||||
{
|
||||
const char* k_payload = tf_http_status_text(404);
|
||||
|
134
src/ssb.c
134
src/ssb.c
@ -765,14 +765,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
|
||||
}
|
||||
else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
|
||||
{
|
||||
if (flags & k_ssb_rpc_flag_binary)
|
||||
{
|
||||
tf_printf("Dropping message with no active request (%d): (%zd bytes).\n", request_number, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
|
||||
}
|
||||
tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -3721,8 +3714,6 @@ typedef struct _connection_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_connection_t* connection;
|
||||
const char* name;
|
||||
const char* after_name;
|
||||
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data);
|
||||
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data);
|
||||
void* user_data;
|
||||
@ -3734,9 +3725,7 @@ static void _tf_ssb_connection_work_callback(uv_work_t* work)
|
||||
tf_ssb_record_thread_busy(data->connection->ssb, true);
|
||||
if (data->work_callback)
|
||||
{
|
||||
tf_trace_begin(data->connection->ssb->trace, data->name);
|
||||
data->work_callback(data->connection, data->user_data);
|
||||
tf_trace_end(data->connection->ssb->trace);
|
||||
}
|
||||
tf_ssb_record_thread_busy(data->connection->ssb, false);
|
||||
}
|
||||
@ -3746,17 +3735,13 @@ static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status)
|
||||
connection_work_t* data = work->data;
|
||||
if (data->after_work_callback)
|
||||
{
|
||||
tf_trace_begin(data->connection->ssb->trace, data->after_name);
|
||||
data->after_work_callback(data->connection, status, data->user_data);
|
||||
tf_trace_end(data->connection->ssb->trace);
|
||||
}
|
||||
data->connection->ref_count--;
|
||||
if (data->connection->ref_count == 0 && data->connection->closing)
|
||||
{
|
||||
_tf_ssb_connection_destroy(data->connection, "work completed");
|
||||
}
|
||||
tf_free((void*)data->name);
|
||||
tf_free((void*)data->after_name);
|
||||
tf_free(data);
|
||||
}
|
||||
|
||||
@ -3784,70 +3769,6 @@ void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_cal
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct _ssb_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
const char* name;
|
||||
const char* after_name;
|
||||
void (*work_callback)(tf_ssb_t* ssb, void* user_data);
|
||||
void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data);
|
||||
void* user_data;
|
||||
} ssb_work_t;
|
||||
|
||||
static void _tf_ssb_work_callback(uv_work_t* work)
|
||||
{
|
||||
ssb_work_t* data = work->data;
|
||||
tf_ssb_record_thread_busy(data->ssb, true);
|
||||
if (data->work_callback)
|
||||
{
|
||||
tf_trace_begin(data->ssb->trace, data->name);
|
||||
data->work_callback(data->ssb, data->user_data);
|
||||
tf_trace_end(data->ssb->trace);
|
||||
}
|
||||
tf_ssb_record_thread_busy(data->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_after_work_callback(uv_work_t* work, int status)
|
||||
{
|
||||
ssb_work_t* data = work->data;
|
||||
if (data->after_work_callback)
|
||||
{
|
||||
tf_trace_begin(data->ssb->trace, data->after_name);
|
||||
data->after_work_callback(data->ssb, status, data->user_data);
|
||||
tf_trace_end(data->ssb->trace);
|
||||
}
|
||||
tf_ssb_unref(data->ssb);
|
||||
tf_free((void*)data->name);
|
||||
tf_free((void*)data->after_name);
|
||||
tf_free(data);
|
||||
}
|
||||
|
||||
void tf_ssb_run_work(tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data)
|
||||
{
|
||||
ssb_work_t* work = tf_malloc(sizeof(ssb_work_t));
|
||||
*work = (ssb_work_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = work,
|
||||
},
|
||||
.name = tf_util_function_to_string(work_callback),
|
||||
.after_name = tf_util_function_to_string(after_work_callback),
|
||||
.ssb = ssb,
|
||||
.work_callback = work_callback,
|
||||
.after_work_callback = after_work_callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
|
||||
tf_ssb_ref(ssb);
|
||||
int result = uv_queue_work(ssb->loop, &work->work, _tf_ssb_work_callback, _tf_ssb_after_work_callback);
|
||||
if (result)
|
||||
{
|
||||
_tf_ssb_connection_after_work_callback(&work->work, result);
|
||||
}
|
||||
}
|
||||
|
||||
bool tf_ssb_is_room(tf_ssb_t* ssb)
|
||||
{
|
||||
return ssb->is_room;
|
||||
@ -3871,6 +3792,8 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name)
|
||||
|
||||
typedef struct _update_settings_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
bool is_room;
|
||||
char room_name[1024];
|
||||
} update_settings_t;
|
||||
@ -3924,35 +3847,58 @@ static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool defau
|
||||
return result;
|
||||
}
|
||||
|
||||
static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_update_settings_work(uv_work_t* work)
|
||||
{
|
||||
update_settings_t* update = user_data;
|
||||
update->is_room = _get_global_setting_bool(ssb, "room", true);
|
||||
_get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name));
|
||||
update_settings_t* update = work->data;
|
||||
tf_ssb_record_thread_busy(update->ssb, true);
|
||||
update->is_room = _get_global_setting_bool(update->ssb, "room", true);
|
||||
_get_global_setting_string(update->ssb, "room_name", update->room_name, sizeof(update->room_name));
|
||||
tf_ssb_record_thread_busy(update->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* user_data)
|
||||
static void _tf_ssb_update_settings_after_work(uv_work_t* work, int result)
|
||||
{
|
||||
update_settings_t* update = user_data;
|
||||
tf_ssb_set_is_room(ssb, update->is_room);
|
||||
tf_ssb_set_room_name(ssb, update->room_name);
|
||||
_tf_ssb_start_update_settings(ssb);
|
||||
update_settings_t* update = work->data;
|
||||
tf_ssb_unref(update->ssb);
|
||||
tf_ssb_set_is_room(update->ssb, update->is_room);
|
||||
tf_ssb_set_room_name(update->ssb, update->room_name);
|
||||
_tf_ssb_start_update_settings(update->ssb);
|
||||
tf_free(update);
|
||||
}
|
||||
|
||||
static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data)
|
||||
{
|
||||
update_settings_t* update = tf_malloc(sizeof(update_settings_t));
|
||||
*update = (update_settings_t) { 0 };
|
||||
tf_ssb_run_work(ssb, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work, update);
|
||||
*update = (update_settings_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = update,
|
||||
},
|
||||
.ssb = ssb,
|
||||
};
|
||||
tf_ssb_ref(ssb);
|
||||
int result = uv_queue_work(tf_ssb_get_loop(ssb), &update->work, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work);
|
||||
if (result)
|
||||
{
|
||||
_tf_ssb_update_settings_after_work(&update->work, result);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_update_settings(tf_ssb_t* ssb)
|
||||
{
|
||||
update_settings_t* update = tf_malloc(sizeof(update_settings_t));
|
||||
*update = (update_settings_t) { 0 };
|
||||
_tf_ssb_update_settings_work(ssb, update);
|
||||
_tf_ssb_update_settings_after_work(ssb, 0, update);
|
||||
*update = (update_settings_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = update,
|
||||
},
|
||||
.ssb = ssb,
|
||||
};
|
||||
tf_ssb_ref(ssb);
|
||||
_tf_ssb_update_settings_work(&update->work);
|
||||
_tf_ssb_update_settings_after_work(&update->work, 0);
|
||||
}
|
||||
|
||||
static void _tf_ssb_start_update_settings(tf_ssb_t* ssb)
|
||||
|
@ -76,30 +76,35 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec
|
||||
|
||||
typedef struct _tf_ssb_connections_get_next_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_connections_t* connections;
|
||||
tf_ssb_t* ssb;
|
||||
bool ready;
|
||||
char host[256];
|
||||
int port;
|
||||
char key[k_id_base64_len];
|
||||
} tf_ssb_connections_get_next_t;
|
||||
|
||||
static void _tf_ssb_connections_get_next_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_connections_get_next_work(uv_work_t* work)
|
||||
{
|
||||
tf_ssb_connections_get_next_t* next = user_data;
|
||||
tf_ssb_connections_get_next_t* next = work->data;
|
||||
tf_ssb_record_thread_busy(next->ssb, true);
|
||||
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
|
||||
tf_ssb_record_thread_busy(next->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
tf_ssb_connections_get_next_t* next = user_data;
|
||||
tf_ssb_connections_get_next_t* next = work->data;
|
||||
if (next->ready)
|
||||
{
|
||||
uint8_t key_bin[k_id_bin_len];
|
||||
if (tf_ssb_id_str_to_bin(key_bin, next->key))
|
||||
{
|
||||
tf_ssb_connect(ssb, next->host, next->port, key_bin);
|
||||
tf_ssb_connect(next->ssb, next->host, next->port, key_bin);
|
||||
}
|
||||
}
|
||||
tf_ssb_unref(next->ssb);
|
||||
tf_free(next);
|
||||
}
|
||||
|
||||
@ -111,10 +116,21 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer)
|
||||
if (count < (int)_countof(active))
|
||||
{
|
||||
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
|
||||
*next = (tf_ssb_connections_get_next_t) {
|
||||
*next = (tf_ssb_connections_get_next_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = next,
|
||||
},
|
||||
.ssb = connections->ssb,
|
||||
.connections = connections,
|
||||
};
|
||||
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work, next);
|
||||
tf_ssb_ref(connections->ssb);
|
||||
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work);
|
||||
if (result)
|
||||
{
|
||||
_tf_ssb_connections_get_next_after_work(&next->work, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,6 +165,8 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
|
||||
|
||||
typedef struct _tf_ssb_connections_update_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
char host[256];
|
||||
int port;
|
||||
char key[k_id_base64_len];
|
||||
@ -156,11 +174,12 @@ typedef struct _tf_ssb_connections_update_t
|
||||
bool succeeded;
|
||||
} tf_ssb_connections_update_t;
|
||||
|
||||
static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_connections_update_work(uv_work_t* work)
|
||||
{
|
||||
tf_ssb_connections_update_t* update = user_data;
|
||||
tf_ssb_connections_update_t* update = work->data;
|
||||
tf_ssb_record_thread_busy(update->ssb, true);
|
||||
sqlite3_stmt* statement;
|
||||
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
|
||||
sqlite3* db = tf_ssb_acquire_db_writer(update->ssb);
|
||||
if (update->attempted)
|
||||
{
|
||||
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK)
|
||||
@ -207,23 +226,31 @@ static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data)
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
}
|
||||
tf_ssb_release_db_writer(ssb, db);
|
||||
tf_ssb_release_db_writer(update->ssb, db);
|
||||
tf_ssb_record_thread_busy(update->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_connections_update_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
tf_free(user_data);
|
||||
tf_ssb_connections_update_t* update = work->data;
|
||||
tf_free(update);
|
||||
}
|
||||
|
||||
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
|
||||
{
|
||||
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work, update);
|
||||
update->work.data = update;
|
||||
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work);
|
||||
if (result)
|
||||
{
|
||||
_tf_ssb_connections_update_after_work(&update->work, result);
|
||||
}
|
||||
}
|
||||
|
||||
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
||||
{
|
||||
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||
*update = (tf_ssb_connections_update_t) {
|
||||
.ssb = connections->ssb,
|
||||
.port = port,
|
||||
};
|
||||
snprintf(update->host, sizeof(update->host), "%s", host);
|
||||
@ -235,6 +262,7 @@ void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const c
|
||||
{
|
||||
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||
*update = (tf_ssb_connections_update_t) {
|
||||
.ssb = connections->ssb,
|
||||
.port = port,
|
||||
.attempted = true,
|
||||
};
|
||||
@ -247,6 +275,7 @@ void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const c
|
||||
{
|
||||
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||
*update = (tf_ssb_connections_update_t) {
|
||||
.ssb = connections->ssb,
|
||||
.port = port,
|
||||
.succeeded = true,
|
||||
};
|
||||
|
137
src/ssb.db.c
137
src/ssb.db.c
@ -19,7 +19,8 @@
|
||||
|
||||
typedef struct _message_store_t message_store_t;
|
||||
|
||||
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data);
|
||||
static void _tf_ssb_db_store_message_work_finish(message_store_t* store);
|
||||
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status);
|
||||
|
||||
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
|
||||
{
|
||||
@ -399,6 +400,8 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
|
||||
|
||||
typedef struct _message_store_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
char id[k_id_base64_len];
|
||||
char signature[512];
|
||||
int flags;
|
||||
@ -418,16 +421,21 @@ typedef struct _message_store_t
|
||||
message_store_t* next;
|
||||
} message_store_t;
|
||||
|
||||
static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_db_store_message_work(uv_work_t* work)
|
||||
{
|
||||
message_store_t* store = user_data;
|
||||
int64_t last_row_id = _tf_ssb_db_store_message_raw(
|
||||
ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags);
|
||||
message_store_t* store = work->data;
|
||||
tf_ssb_record_thread_busy(store->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
|
||||
tf_trace_begin(trace, "message_store_work");
|
||||
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp,
|
||||
store->content, store->length, store->signature, store->flags);
|
||||
if (last_row_id != -1)
|
||||
{
|
||||
store->out_stored = true;
|
||||
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id);
|
||||
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
|
||||
}
|
||||
tf_trace_end(trace);
|
||||
tf_ssb_record_thread_busy(store->ssb, false);
|
||||
}
|
||||
|
||||
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
||||
@ -444,19 +452,38 @@ static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
||||
}
|
||||
next->next = NULL;
|
||||
queue->running = true;
|
||||
tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_db_store_message_work_finish(next);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
|
||||
{
|
||||
message_store_t* store = user_data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(ssb);
|
||||
JSContext* context = tf_ssb_get_context(store->ssb);
|
||||
if (store->callback)
|
||||
{
|
||||
store->callback(store->id, store->out_stored, store->user_data);
|
||||
}
|
||||
JS_FreeCString(context, store->content);
|
||||
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb);
|
||||
queue->running = false;
|
||||
_wake_up_queue(store->ssb, queue);
|
||||
tf_free(store);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
message_store_t* store = work->data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
|
||||
tf_trace_begin(trace, "message_store_after_work");
|
||||
if (store->out_stored)
|
||||
{
|
||||
tf_trace_begin(trace, "notify_message_added");
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
JSContext* context = tf_ssb_get_context(store->ssb);
|
||||
JSValue formatted =
|
||||
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
|
||||
JSValue message = JS_NewObject(context);
|
||||
@ -465,7 +492,7 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
|
||||
char timestamp_string[256];
|
||||
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
|
||||
tf_ssb_notify_message_added(ssb, store->id, message);
|
||||
tf_ssb_notify_message_added(store->ssb, store->id, message);
|
||||
JS_FreeValue(context, message);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
@ -474,22 +501,13 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
|
||||
tf_trace_begin(trace, "notify_blob_wants_added");
|
||||
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
|
||||
{
|
||||
tf_ssb_notify_blob_want_added(ssb, p);
|
||||
tf_ssb_notify_blob_want_added(store->ssb, p);
|
||||
}
|
||||
tf_free(store->out_blob_wants);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
if (store->callback)
|
||||
{
|
||||
store->callback(store->id, store->out_stored, store->user_data);
|
||||
}
|
||||
JS_FreeCString(context, store->content);
|
||||
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
|
||||
queue->running = false;
|
||||
_wake_up_queue(ssb, queue);
|
||||
tf_free(store);
|
||||
_tf_ssb_db_store_message_work_finish(store);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
void tf_ssb_db_store_message(
|
||||
@ -521,7 +539,13 @@ void tf_ssb_db_store_message(
|
||||
JS_FreeValue(context, contentval);
|
||||
|
||||
message_store_t* store = tf_malloc(sizeof(message_store_t));
|
||||
*store = (message_store_t) {
|
||||
*store = (message_store_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = store,
|
||||
},
|
||||
.ssb = ssb,
|
||||
.sequence = sequence,
|
||||
.timestamp = timestamp,
|
||||
.content = contentstr,
|
||||
@ -636,6 +660,8 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
|
||||
|
||||
typedef struct _blob_store_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
const uint8_t* blob;
|
||||
size_t size;
|
||||
char id[k_blob_id_len];
|
||||
@ -644,18 +670,25 @@ typedef struct _blob_store_work_t
|
||||
void* user_data;
|
||||
} blob_store_work_t;
|
||||
|
||||
static void _tf_ssb_db_blob_store_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_db_blob_store_work(uv_work_t* work)
|
||||
{
|
||||
blob_store_work_t* blob_work = user_data;
|
||||
tf_ssb_db_blob_store(ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
|
||||
blob_store_work_t* blob_work = work->data;
|
||||
tf_ssb_record_thread_busy(blob_work->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
|
||||
tf_trace_begin(trace, "blob_store_work");
|
||||
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
|
||||
tf_trace_end(trace);
|
||||
tf_ssb_record_thread_busy(blob_work->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
blob_store_work_t* blob_work = user_data;
|
||||
blob_store_work_t* blob_work = work->data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
|
||||
tf_trace_begin(trace, "blob_store_after_work");
|
||||
if (status == 0 && *blob_work->id)
|
||||
{
|
||||
tf_ssb_notify_blob_stored(ssb, blob_work->id);
|
||||
tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id);
|
||||
}
|
||||
if (status != 0)
|
||||
{
|
||||
@ -665,19 +698,35 @@ static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* us
|
||||
{
|
||||
blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data);
|
||||
}
|
||||
tf_trace_end(trace);
|
||||
tf_free(blob_work);
|
||||
}
|
||||
|
||||
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data)
|
||||
{
|
||||
blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t));
|
||||
*work = (blob_store_work_t) {
|
||||
*work = (blob_store_work_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = work,
|
||||
},
|
||||
.ssb = ssb,
|
||||
.blob = blob,
|
||||
.size = size,
|
||||
.callback = callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
tf_ssb_run_work(ssb, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work, work);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work);
|
||||
if (r)
|
||||
{
|
||||
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r));
|
||||
if (callback)
|
||||
{
|
||||
callback(NULL, false, user_data);
|
||||
}
|
||||
tf_free(work);
|
||||
}
|
||||
}
|
||||
|
||||
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
|
||||
@ -1706,17 +1755,19 @@ bool tf_ssb_db_identity_get_active(sqlite3* db, const char* user, const char* pa
|
||||
|
||||
typedef struct _resolve_index_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
const char* host;
|
||||
const char* path;
|
||||
void (*callback)(const char* path, void* user_data);
|
||||
void* user_data;
|
||||
} resolve_index_t;
|
||||
|
||||
static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_db_resolve_index_work(uv_work_t* work)
|
||||
{
|
||||
resolve_index_t* request = user_data;
|
||||
resolve_index_t* request = work->data;
|
||||
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, "SELECT json_extract(value, '$.index_map') FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
@ -1762,12 +1813,12 @@ static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data)
|
||||
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
}
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
tf_ssb_release_db_reader(request->ssb, db);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_resolve_index_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_db_resolve_index_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
resolve_index_t* request = user_data;
|
||||
resolve_index_t* request = work->data;
|
||||
request->callback(request->path, request->user_data);
|
||||
tf_free((void*)request->host);
|
||||
tf_free((void*)request->path);
|
||||
@ -1778,9 +1829,15 @@ void tf_ssb_db_resolve_index_async(tf_ssb_t* ssb, const char* host, void (*callb
|
||||
{
|
||||
resolve_index_t* request = tf_malloc(sizeof(resolve_index_t));
|
||||
*request = (resolve_index_t) {
|
||||
.work = { .data = request },
|
||||
.ssb = ssb,
|
||||
.host = tf_strdup(host),
|
||||
.callback = callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
tf_ssb_run_work(ssb, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work, request);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &request->work, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_db_resolve_index_after_work(&request->work, r);
|
||||
}
|
||||
}
|
||||
|
10
src/ssb.h
10
src/ssb.h
@ -755,16 +755,6 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch
|
||||
void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data),
|
||||
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data);
|
||||
|
||||
/**
|
||||
** Schedule work to run on a worker thread.
|
||||
** @param ssb The owning SSB instance.
|
||||
** @param work_callback The callback to run on a thread.
|
||||
** @param after_work_callback The callback to run on the main thread when the work is complete.
|
||||
** @param user_data User data to pass to the callback.
|
||||
*/
|
||||
void tf_ssb_run_work(
|
||||
tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data);
|
||||
|
||||
/**
|
||||
** Register for new messages on a connection.
|
||||
** @param connection The SHS connection.
|
||||
|
119
src/ssb.js.c
119
src/ssb.js.c
@ -4,6 +4,7 @@
|
||||
#include "mem.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.h"
|
||||
#include "trace.h"
|
||||
#include "util.js.h"
|
||||
|
||||
#include "sodium/crypto_box.h"
|
||||
@ -301,6 +302,8 @@ static JSValue _tf_ssb_getAllIdentities(JSContext* context, JSValueConst this_va
|
||||
|
||||
typedef struct _active_identity_work_t
|
||||
{
|
||||
uv_work_t request;
|
||||
tf_ssb_t* ssb;
|
||||
JSContext* context;
|
||||
const char* name;
|
||||
const char* package_owner;
|
||||
@ -319,22 +322,29 @@ static void _tf_ssb_getActiveIdentity_visit(const char* identity, void* user_dat
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_getActiveIdentity_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_getActiveIdentity_work(uv_work_t* work)
|
||||
{
|
||||
active_identity_work_t* request = user_data;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
active_identity_work_t* request = work->data;
|
||||
tf_ssb_record_thread_busy(request->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(request->ssb);
|
||||
tf_trace_begin(trace, "_tf_ssb_getActiveIdentity_work");
|
||||
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
|
||||
tf_ssb_db_identity_get_active(db, request->name, request->package_owner, request->package_name, request->identity, sizeof(request->identity));
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
tf_ssb_release_db_reader(request->ssb, db);
|
||||
|
||||
if (!*request->identity)
|
||||
{
|
||||
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getActiveIdentity_visit, request);
|
||||
tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getActiveIdentity_visit, request);
|
||||
}
|
||||
|
||||
tf_trace_end(trace);
|
||||
tf_ssb_record_thread_busy(request->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
active_identity_work_t* request = user_data;
|
||||
active_identity_work_t* request = work->data;
|
||||
JSContext* context = request->context;
|
||||
if (request->result == 0)
|
||||
{
|
||||
@ -360,12 +370,13 @@ static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void
|
||||
|
||||
static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
|
||||
{
|
||||
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
|
||||
const char* name = JS_ToCString(context, argv[0]);
|
||||
const char* package_owner = JS_ToCString(context, argv[1]);
|
||||
const char* package_name = JS_ToCString(context, argv[2]);
|
||||
active_identity_work_t* work = tf_malloc(sizeof(active_identity_work_t));
|
||||
*work = (active_identity_work_t) {
|
||||
.request = { .data = work },
|
||||
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
|
||||
.context = context,
|
||||
.name = tf_strdup(name),
|
||||
.package_owner = tf_strdup(package_owner),
|
||||
@ -376,12 +387,18 @@ static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_v
|
||||
JS_FreeCString(context, package_owner);
|
||||
JS_FreeCString(context, package_name);
|
||||
|
||||
tf_ssb_run_work(ssb, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work, work);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_getActiveIdentity_after_work(&work->request, r);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
typedef struct _identity_info_work_t
|
||||
{
|
||||
uv_work_t request;
|
||||
tf_ssb_t* ssb;
|
||||
JSContext* context;
|
||||
const char* name;
|
||||
const char* package_owner;
|
||||
@ -405,12 +422,12 @@ static void _tf_ssb_getIdentityInfo_visit(const char* identity, void* data)
|
||||
;
|
||||
}
|
||||
|
||||
static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_getIdentityInfo_work(uv_work_t* work)
|
||||
{
|
||||
identity_info_work_t* request = user_data;
|
||||
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getIdentityInfo_visit, request);
|
||||
identity_info_work_t* request = work->data;
|
||||
tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getIdentityInfo_visit, request);
|
||||
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
|
||||
sqlite3_stmt* statement = NULL;
|
||||
request->result = sqlite3_prepare(db,
|
||||
"SELECT author, name FROM ( "
|
||||
@ -450,12 +467,12 @@ static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data)
|
||||
{
|
||||
snprintf(request->active_identity, sizeof(request->active_identity), "%s", request->identities[0]);
|
||||
}
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
tf_ssb_release_db_reader(request->ssb, db);
|
||||
}
|
||||
|
||||
static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
identity_info_work_t* request = user_data;
|
||||
identity_info_work_t* request = work->data;
|
||||
JSContext* context = request->context;
|
||||
JSValue result = JS_NewObject(context);
|
||||
|
||||
@ -497,12 +514,13 @@ static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void*
|
||||
|
||||
static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
|
||||
{
|
||||
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
|
||||
const char* name = JS_ToCString(context, argv[0]);
|
||||
const char* package_owner = JS_ToCString(context, argv[1]);
|
||||
const char* package_name = JS_ToCString(context, argv[2]);
|
||||
identity_info_work_t* work = tf_malloc(sizeof(identity_info_work_t));
|
||||
*work = (identity_info_work_t) {
|
||||
.request = { .data = work },
|
||||
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
|
||||
.context = context,
|
||||
.name = tf_strdup(name),
|
||||
.package_owner = tf_strdup(package_owner),
|
||||
@ -513,7 +531,11 @@ static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val
|
||||
JS_FreeCString(context, package_owner);
|
||||
JS_FreeCString(context, package_name);
|
||||
|
||||
tf_ssb_run_work(ssb, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work, work);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_getIdentityInfo_after_work(&work->request, r);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -823,6 +845,7 @@ typedef struct _sql_work_t
|
||||
uint8_t* rows;
|
||||
size_t binds_count;
|
||||
size_t rows_count;
|
||||
uv_work_t request;
|
||||
uv_async_t async;
|
||||
uv_timer_t timeout;
|
||||
uv_mutex_t lock;
|
||||
@ -838,10 +861,13 @@ static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* d
|
||||
*rows_count += size;
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_sqlAsync_work(uv_work_t* work)
|
||||
{
|
||||
sql_work_t* sql_work = user_data;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader_restricted(ssb);
|
||||
sql_work_t* sql_work = work->data;
|
||||
tf_ssb_record_thread_busy(sql_work->ssb, true);
|
||||
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
|
||||
tf_trace_begin(trace, "sql_async_work");
|
||||
sqlite3* db = tf_ssb_acquire_db_reader_restricted(sql_work->ssb);
|
||||
uv_mutex_lock(&sql_work->lock);
|
||||
sql_work->db = db;
|
||||
uv_mutex_unlock(&sql_work->lock);
|
||||
@ -949,7 +975,9 @@ static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data)
|
||||
uv_mutex_lock(&sql_work->lock);
|
||||
sql_work->db = NULL;
|
||||
uv_mutex_unlock(&sql_work->lock);
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
tf_ssb_release_db_reader(sql_work->ssb, db);
|
||||
tf_ssb_record_thread_busy(sql_work->ssb, false);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_handle_close(uv_handle_t* handle)
|
||||
@ -975,10 +1003,12 @@ static void _tf_ssb_sqlAsync_destroy(sql_work_t* work)
|
||||
uv_close((uv_handle_t*)&work->async, _tf_ssb_sqlAsync_handle_close);
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
sql_work_t* sql_work = user_data;
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
sql_work_t* sql_work = work->data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
|
||||
tf_trace_begin(trace, "sql_async_after_work");
|
||||
JSContext* context = tf_ssb_get_context(sql_work->ssb);
|
||||
uint8_t* p = sql_work->rows;
|
||||
while (p < sql_work->rows + sql_work->rows_count)
|
||||
{
|
||||
@ -1063,6 +1093,7 @@ static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_da
|
||||
JS_FreeValue(context, sql_work->callback);
|
||||
JS_FreeCString(context, sql_work->query);
|
||||
_tf_ssb_sqlAsync_destroy(sql_work);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_timeout(uv_timer_t* timer)
|
||||
@ -1089,6 +1120,10 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
|
||||
sql_work_t* work = tf_malloc(sizeof(sql_work_t));
|
||||
*work = (sql_work_t)
|
||||
{
|
||||
.request =
|
||||
{
|
||||
.data = work,
|
||||
},
|
||||
.async =
|
||||
{
|
||||
.data = work,
|
||||
@ -1148,7 +1183,11 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
|
||||
}
|
||||
JS_FreeValue(context, value);
|
||||
}
|
||||
tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->request, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work);
|
||||
if (r)
|
||||
{
|
||||
error_value = JS_ThrowInternalError(context, "uv_queue_work failed: %s", uv_strerror(r));
|
||||
}
|
||||
}
|
||||
if (!JS_IsUndefined(error_value))
|
||||
{
|
||||
@ -1770,6 +1809,8 @@ static JSValue _tf_ssb_private_message_decrypt(JSContext* context, JSValueConst
|
||||
|
||||
typedef struct _following_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
JSContext* context;
|
||||
JSValue promise[2];
|
||||
|
||||
@ -1780,15 +1821,17 @@ typedef struct _following_t
|
||||
const char* ids[];
|
||||
} following_t;
|
||||
|
||||
static void _tf_ssb_following_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_following_work(uv_work_t* work)
|
||||
{
|
||||
following_t* following = user_data;
|
||||
following->out_following = tf_ssb_db_following_deep(ssb, following->ids, following->ids_count, following->depth);
|
||||
following_t* following = work->data;
|
||||
tf_ssb_record_thread_busy(following->ssb, true);
|
||||
following->out_following = tf_ssb_db_following_deep(following->ssb, following->ids, following->ids_count, following->depth);
|
||||
tf_ssb_record_thread_busy(following->ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_following_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_following_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
following_t* following = user_data;
|
||||
following_t* following = work->data;
|
||||
JSContext* context = following->context;
|
||||
if (status == 0)
|
||||
{
|
||||
@ -1835,8 +1878,14 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
|
||||
int ids_count = tf_util_get_length(context, argv[0]);
|
||||
|
||||
following_t* following = tf_malloc(sizeof(following_t) + sizeof(char*) * ids_count);
|
||||
*following = (following_t) {
|
||||
*following = (following_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = following,
|
||||
},
|
||||
.context = context,
|
||||
.ssb = ssb,
|
||||
};
|
||||
JS_ToInt32(context, &following->depth, argv[1]);
|
||||
|
||||
@ -1854,7 +1903,11 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
|
||||
}
|
||||
}
|
||||
|
||||
tf_ssb_run_work(ssb, _tf_ssb_following_work, _tf_ssb_following_after_work, following);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &following->work, _tf_ssb_following_work, _tf_ssb_following_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_following_after_work(&following->work, r);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1137,6 +1137,12 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct _delete_blobs_work_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
} delete_blobs_work_t;
|
||||
|
||||
static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
|
||||
{
|
||||
int64_t checkpoint_start_ms = uv_hrtime();
|
||||
@ -1154,12 +1160,16 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
|
||||
tf_ssb_release_db_writer(ssb, db);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
|
||||
static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
|
||||
{
|
||||
delete_blobs_work_t* delete = work->data;
|
||||
tf_ssb_t* ssb = delete->ssb;
|
||||
tf_ssb_record_thread_busy(ssb, true);
|
||||
int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1);
|
||||
if (age <= 0)
|
||||
{
|
||||
_tf_ssb_rpc_checkpoint(ssb);
|
||||
tf_ssb_record_thread_busy(ssb, false);
|
||||
return;
|
||||
}
|
||||
int64_t start_ns = uv_hrtime();
|
||||
@ -1201,16 +1211,28 @@ static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
|
||||
tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms);
|
||||
_tf_ssb_rpc_checkpoint(ssb);
|
||||
_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000));
|
||||
tf_ssb_record_thread_busy(ssb, false);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
tf_ssb_unref(ssb);
|
||||
delete_blobs_work_t* delete = work->data;
|
||||
tf_ssb_unref(delete->ssb);
|
||||
tf_free(delete);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data)
|
||||
{
|
||||
tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL);
|
||||
delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t));
|
||||
*work = (delete_blobs_work_t) { .work = { .data = work }, .ssb = ssb };
|
||||
tf_ssb_ref(ssb);
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work);
|
||||
if (r)
|
||||
{
|
||||
tf_printf("uv_queue_work: %s\n", uv_strerror(r));
|
||||
tf_free(work);
|
||||
tf_ssb_unref(ssb);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)
|
||||
|
@ -272,7 +272,7 @@ static void _tf_trace_begin_tagged(tf_trace_t* trace, const char* name, void* ta
|
||||
|
||||
char line[1024];
|
||||
int p = snprintf(line, sizeof(line), "{\"ph\": \"B\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)self, _trace_ts());
|
||||
p += _tf_trace_escape_name(line + p, sizeof(line) - p - 4, name);
|
||||
p += _tf_trace_escape_name(line + p, sizeof(line) - p, name);
|
||||
p += snprintf(line + p, sizeof(line) - p, "\"},");
|
||||
p = tf_min(p, tf_countof(line));
|
||||
trace->callback(trace, line, p, trace->user_data);
|
||||
@ -299,7 +299,7 @@ static void _tf_trace_end_tagged(tf_trace_t* trace, void* tag)
|
||||
|
||||
char line[1024];
|
||||
int p = snprintf(line, sizeof(line), "{\"ph\": \"E\", \"pid\": %d, \"tid\": %" PRId64 ", \"ts\": %" PRId64 ", \"name\": \"", getpid(), (int64_t)pthread_self(), _trace_ts());
|
||||
p += _tf_trace_escape_name(line + p, sizeof(line) - p - 4, name);
|
||||
p += _tf_trace_escape_name(line + p, sizeof(line) - p, name);
|
||||
p += snprintf(line + p, sizeof(line) - p, "\"},");
|
||||
p = tf_min(p, tf_countof(line));
|
||||
trace->callback(trace, line, p, trace->user_data);
|
||||
|
Loading…
Reference in New Issue
Block a user