Make blobs.has do its work off the main thread so it doesn't violate that assert, and make the test cover such things a bit better.

This commit is contained in:
Cory McWilliams 2024-08-21 22:55:40 -04:00
parent bfb3d8b8a2
commit 63918f0680
2 changed files with 71 additions and 2 deletions

View File

@ -148,6 +148,28 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags
JS_FreeValue(context, ids); JS_FreeValue(context, ids);
} }
typedef struct _blobs_has_work_t
{
int64_t request_number;
char id[k_id_base64_len];
bool found;
} blobs_has_work_t;
static void _tf_ssb_rpc_blobs_has_work(tf_ssb_connection_t* connection, void* user_data)
{
blobs_has_work_t* work = user_data;
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
work->found = tf_ssb_db_blob_has(ssb, work->id);
}
static void _tf_ssb_rpc_blobs_has_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
blobs_has_work_t* work = user_data;
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL,
(const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL);
tf_free(work);
}
static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
@ -160,11 +182,17 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags
JSValue ids = JS_GetPropertyStr(context, args, "args"); JSValue ids = JS_GetPropertyStr(context, args, "args");
JSValue id = JS_GetPropertyUint32(context, ids, 0); JSValue id = JS_GetPropertyUint32(context, ids, 0);
const char* id_str = JS_ToCString(context, id); const char* id_str = JS_ToCString(context, id);
bool has = tf_ssb_db_blob_has(ssb, id_str);
blobs_has_work_t* work = tf_malloc(sizeof(blobs_has_work_t));
*work = (blobs_has_work_t) {
.request_number = request_number,
};
snprintf(work->id, sizeof(work->id), "%s", id_str);
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_has_work, _tf_ssb_rpc_blobs_has_after_work, work);
JS_FreeCString(context, id_str); JS_FreeCString(context, id_str);
JS_FreeValue(context, id); JS_FreeValue(context, id);
JS_FreeValue(context, ids); JS_FreeValue(context, ids);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, NULL, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL);
} }
static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)

View File

@ -250,22 +250,51 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
tf_printf("Waiting for connection.\n"); tf_printf("Waiting for connection.\n");
while (test.connection_count0 != 1 || test.connection_count1 != 1) while (test.connection_count0 != 1 || test.connection_count1 != 1)
{ {
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
} }
tf_ssb_server_close(ssb0); tf_ssb_server_close(ssb0);
tf_printf("Waiting for messages.\n"); tf_printf("Waiting for messages.\n");
while (_ssb_test_count_messages(ssb1) < 3) while (_ssb_test_count_messages(ssb1) < 3)
{ {
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
} }
tf_printf("Waiting for blob.\n"); tf_printf("Waiting for blob.\n");
while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL)) while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL))
{ {
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
} }
JSContext* context = tf_ssb_get_context(ssb1);
JSValue message = JS_NewObject(context);
JSValue name = JS_NewArray(context);
JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs"));
JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "has"));
JS_SetPropertyStr(context, message, "name", name);
JSValue args = JS_NewArray(context);
JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id));
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "async"));
tf_ssb_connection_t* connections[4] = { 0 };
tf_ssb_get_connections(ssb1, connections, 4);
int64_t request_number = tf_ssb_connection_next_request_number(connections[0]);
tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "blobs.has", message, NULL, NULL, NULL);
JS_FreeValue(context, message);
uint8_t* b1; uint8_t* b1;
size_t s1 = 0; size_t s1 = 0;
b = tf_ssb_db_blob_get(ssb1, blob_id, &b1, &s1); b = tf_ssb_db_blob_get(ssb1, blob_id, &b1, &s1);
@ -293,14 +322,22 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
while (count0 == 0) while (count0 == 0)
{ {
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
} }
tf_ssb_remove_message_added_callback(ssb0, _message_added, &count0); tf_ssb_remove_message_added_callback(ssb0, _message_added, &count0);
tf_printf("Waiting for message from other.\n"); tf_printf("Waiting for message from other.\n");
while (count1 == 0) while (count1 == 0)
{ {
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
} }
tf_ssb_remove_message_added_callback(ssb1, _message_added, &count1); tf_ssb_remove_message_added_callback(ssb1, _message_added, &count1);
tf_printf("done\n"); tf_printf("done\n");
@ -311,7 +348,11 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
uv_close((uv_handle_t*)&idle1, NULL); uv_close((uv_handle_t*)&idle1, NULL);
tf_printf("final run\n"); tf_printf("final run\n");
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_run(&loop, UV_RUN_DEFAULT); uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
tf_printf("done\n"); tf_printf("done\n");
tf_printf("destroy 0\n"); tf_printf("destroy 0\n");