diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index f7345870..48f7e865 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -148,6 +148,28 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags JS_FreeValue(context, ids); } +typedef struct _blobs_has_work_t +{ + int64_t request_number; + char id[k_id_base64_len]; + bool found; +} blobs_has_work_t; + +static void _tf_ssb_rpc_blobs_has_work(tf_ssb_connection_t* connection, void* user_data) +{ + blobs_has_work_t* work = user_data; + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + work->found = tf_ssb_db_blob_has(ssb, work->id); +} + +static void _tf_ssb_rpc_blobs_has_after_work(tf_ssb_connection_t* connection, int status, void* user_data) +{ + blobs_has_work_t* work = user_data; + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL, + (const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL); + tf_free(work); +} + static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -160,11 +182,17 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags JSValue ids = JS_GetPropertyStr(context, args, "args"); JSValue id = JS_GetPropertyUint32(context, ids, 0); const char* id_str = JS_ToCString(context, id); - bool has = tf_ssb_db_blob_has(ssb, id_str); + + blobs_has_work_t* work = tf_malloc(sizeof(blobs_has_work_t)); + *work = (blobs_has_work_t) { + .request_number = request_number, + }; + snprintf(work->id, sizeof(work->id), "%s", id_str); + tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_has_work, _tf_ssb_rpc_blobs_has_after_work, work); + JS_FreeCString(context, id_str); JS_FreeValue(context, id); JS_FreeValue(context, ids); - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, NULL, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL); } static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) diff --git a/src/ssb.tests.c b/src/ssb.tests.c index f0b36bbe..4dc801e5 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -250,22 +250,51 @@ void tf_ssb_test_ssb(const tf_test_options_t* options) tf_printf("Waiting for connection.\n"); while (test.connection_count0 != 1 || test.connection_count1 != 1) { + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } tf_ssb_server_close(ssb0); tf_printf("Waiting for messages.\n"); while (_ssb_test_count_messages(ssb1) < 3) { + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } tf_printf("Waiting for blob.\n"); while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL)) { + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } + JSContext* context = tf_ssb_get_context(ssb1); + JSValue message = JS_NewObject(context); + JSValue name = JS_NewArray(context); + JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); + JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "has")); + JS_SetPropertyStr(context, message, "name", name); + JSValue args = JS_NewArray(context); + JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id)); + JS_SetPropertyStr(context, message, "args", args); + JS_SetPropertyStr(context, message, "type", JS_NewString(context, "async")); + + tf_ssb_connection_t* connections[4] = { 0 }; + tf_ssb_get_connections(ssb1, connections, 4); + int64_t request_number = tf_ssb_connection_next_request_number(connections[0]); + tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "blobs.has", message, NULL, NULL, NULL); + JS_FreeValue(context, message); + uint8_t* b1; size_t s1 = 0; b = tf_ssb_db_blob_get(ssb1, blob_id, &b1, &s1); @@ -293,14 +322,22 @@ void tf_ssb_test_ssb(const tf_test_options_t* options) while (count0 == 0) { + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } tf_ssb_remove_message_added_callback(ssb0, _message_added, &count0); tf_printf("Waiting for message from other.\n"); while (count1 == 0) { + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } tf_ssb_remove_message_added_callback(ssb1, _message_added, &count1); tf_printf("done\n"); @@ -311,7 +348,11 @@ void tf_ssb_test_ssb(const tf_test_options_t* options) uv_close((uv_handle_t*)&idle1, NULL); tf_printf("final run\n"); + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); uv_run(&loop, UV_RUN_DEFAULT); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); tf_printf("done\n"); tf_printf("destroy 0\n");