diff --git a/src/ssb.c b/src/ssb.c index f5e8d309..e924d3ea 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -341,7 +341,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect return found; } -static void _tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) { if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { return; @@ -371,7 +371,7 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data) { if (request_number > 0) { - _tf_ssb_connection_add_request(connection, request_number, callback, user_data); + tf_ssb_connection_add_request(connection, request_number, callback, user_data); } uint8_t* combined = malloc(9 + size); *combined = flags; @@ -1881,6 +1881,7 @@ bool tf_ssb_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* ou char id[512]; snprintf(id, sizeof(id), "&%s.sha256", hash64); + printf("blob store %s\n", id); const char* query = "INSERT INTO blobs (id, content, created) VALUES ($1, $2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING"; if (sqlite3_prepare(ssb->db, query, -1, &statement, NULL) == SQLITE_OK) { diff --git a/src/ssb.h b/src/ssb.h index df233b6b..43a3f6fa 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -111,4 +111,5 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index f836d71e..411ed5ce 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -193,7 +193,10 @@ static void _tf_ssb_connection_on_rpc_blobs_createWants_response(tf_ssb_connecti JSValue jsonval = JS_JSONStringify(context, size_response, JS_NULL, JS_NULL); size_t len; const char* json = JS_ToCStringLen(context, &len, jsonval); - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, request_number, (uint8_t*)json, len, NULL, NULL); + tf_ssb_rpc_t* rpc = tf_ssb_get_rpc(ssb); + if (rpc->wants) { + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -rpc->wants->request_number, (uint8_t*)json, len, NULL, NULL); + } JS_FreeCString(context, json); JS_FreeValue(context, jsonval); JS_FreeValue(context, size_response); @@ -640,12 +643,12 @@ tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb) *rpc = (tf_ssb_rpc_t) { .wants_async.data = rpc, }; + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); uv_unref((uv_handle_t*)&rpc->wants_async); return rpc; diff --git a/src/ssb.tests.c b/src/ssb.tests.c index 71970f8e..3236eb52 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -116,6 +116,8 @@ static void _tf_ssb_test_ssb() tf_ssb_append_message(ssb0, message); JS_FreeValue(context, message); + assert(tf_ssb_blob_get(ssb0, blob_id, NULL, NULL)); + assert(!tf_ssb_blob_get(ssb1, blob_id, NULL, NULL)); tf_ssb_server_open(ssb0, 12347); uint8_t id0bin[k_id_bin_len]; @@ -132,9 +134,11 @@ static void _tf_ssb_test_ssb() uv_run(&loop, UV_RUN_ONCE); } + printf("waiting for blob\n"); while (!tf_ssb_blob_get(ssb1, blob_id, NULL, NULL)) { uv_run(&loop, UV_RUN_ONCE); } + printf("done\n"); tf_ssb_send_close(ssb1);