diff --git a/core/ssb.js b/core/ssb.js index 576256fd..d4996dd5 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -2,10 +2,7 @@ var g_wants_requests = {}; var g_database = new Database('core'); let g_attendants = {}; -let g_blob_wants_sent = 0; -let g_blob_last_requested = ''; const k_use_create_history_stream = false; -const k_blobs_concurrent_target = 8; function get_latest_sequence_for_author(author) { var sequence = 0; @@ -48,72 +45,8 @@ function tunnel_attendants(request) { }); } -function get_more_blobs(connection) { - while (Object.keys(connection.active_blob_gets).length < k_blobs_concurrent_target) { - let next = Object.keys(connection.blob_get_queue).pop(); - let expected_bytes = connection.blob_get_queue[next]; - if (!next) { - break; - } - delete connection.blob_get_queue[next]; - connection.active_blob_gets[next] = true; - - let received_bytes = 0; - let buffer = new Uint8Array(expected_bytes); - connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [next]}, function(message) { - if (message.flags & 0x4 /* end */) { - delete connection.active_blob_gets[next]; - } else { - buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); - received_bytes += message.message.byteLength; - if (received_bytes == expected_bytes) { - ssb.blobStore(buffer); - delete connection.active_blob_gets[next]; - get_more_blobs(connection); - } - } - }); - } -} - -function send_blobs_create_wants(connection) { - connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) { - if (message.message?.name === 'Error') { - return; - } - Object.keys(message.message).forEach(function(id) { - g_blob_wants_sent--; - if (g_blob_wants_sent == 0) { - request_more_blobs(g_wants_requests[connection.id]); - } - if (message.message[id] < 0) { - let blob; - if (g_wants_requests[connection.id]) { - blob = ssb.blobGet(id); - if (blob) { - let out_message = {}; - out_message[id] = blob.byteLength; - g_wants_requests[connection.id].send_json(out_message); - } - } - if (!blob && message.message[id] == -1) { - let out_message = {}; - out_message[id] = -2; - g_wants_requests[connection.id].send_json(out_message); - } - } else { - let expected_bytes = message.message[id]; - connection.blob_get_queue[id] = expected_bytes; - get_more_blobs(connection); - } - }); - }); -} - ssb.addEventListener('connections', function on_connections_changed(change, connection) { if (change == 'add') { - connection.active_blob_gets = {}; - connection.blob_get_queue = {}; var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); @@ -137,7 +70,6 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn } }); } - send_blobs_create_wants(connection); } } else if (change == 'remove') { print('REMOVE', connection.id); @@ -149,32 +81,6 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn } }); -function blob_want_discovered(request, id) { - if (!request || !request.connection) { - return; - } - var message = {}; - message[id] = -1; - g_blob_wants_sent++; - request.send_json(message); -} - -function request_more_blobs(request) { - return ssb.sqlStream( - 'SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32', - [g_blob_last_requested], - function(row) { - blob_want_discovered(request, row.id); - g_blob_last_requested = row.id; - }); -} - -ssb.addRpc(['blobs', 'createWants'], function(request) { - g_wants_requests[request.connection.id] = request; - ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id)); - request_more_blobs(request); -}); - function notify_attendant_changed(id, type) { if (!id) { print(`notify_attendant_changed called with id=${id}`); diff --git a/src/ssb.c b/src/ssb.c index bc8d44cf..34cb093d 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -198,6 +198,8 @@ typedef struct _tf_ssb_connection_t tf_ssb_connection_t* tunnel_connection; int32_t tunnel_request_number; + tf_ssb_blob_wants_t blob_wants; + JSValue object; char name[32]; @@ -2926,3 +2928,8 @@ void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, co _tf_ssb_notify_broadcasts_changed(connection->ssb); } } + +tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t* connection) +{ + return connection ? &connection->blob_wants : NULL; +} diff --git a/src/ssb.h b/src/ssb.h index 57b93c21..04303183 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -57,6 +57,13 @@ typedef struct _tf_ssb_stats_t } callbacks; } tf_ssb_stats_t; +typedef struct _tf_ssb_blob_wants_t +{ + int32_t request_number; + char last_id[k_blob_id_len]; + int wants_sent; +} tf_ssb_blob_wants_t; + tf_ssb_t* tf_ssb_create(uv_loop_t* loop, JSContext* context, sqlite3* db); void tf_ssb_destroy(tf_ssb_t* ssb); @@ -146,3 +153,5 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* JSClassID tf_ssb_get_connection_class_id(); void tf_ssb_get_stats(tf_ssb_t* ssb, tf_ssb_stats_t* out_stats); + +tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t* connection); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 22b629a0..905d01af 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -105,6 +105,62 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags NULL); } +static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) +{ + tf_ssb_connection_t* connection = user_data; + tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1)); + JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t size = 0; + const char* message_str = JS_ToCStringLen(context, &size, json); + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -blob_wants->request_number, (const uint8_t*)message_str, size, NULL, NULL, NULL); + JS_FreeCString(context, message_str); + JS_FreeValue(context, json); + JS_FreeValue(context, message); +} + +static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) +{ + JSContext* context = tf_ssb_connection_get_context(connection); + tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + sqlite3* db = tf_ssb_get_db(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK) + { + while (sqlite3_step(statement) == SQLITE_ROW) + { + const char* blob = (const char*)sqlite3_column_text(statement, 0); + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, blob, JS_NewInt32(context, -1)); + JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t size = 0; + const char* message_str = JS_ToCStringLen(context, &size, json); + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -blob_wants->request_number, (const uint8_t*)message_str, size, NULL, NULL, NULL); + JS_FreeCString(context, message_str); + JS_FreeValue(context, json); + JS_FreeValue(context, message); + snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", blob); + blob_wants->wants_sent++; + } + } + sqlite3_finalize(statement); + } +} + +static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, NULL, connection); + blob_wants->request_number = request_number; + _tf_ssb_rpc_request_more_blobs(connection); +} + typedef struct tunnel_t { tf_ssb_connection_t* connection; @@ -270,11 +326,229 @@ static void _tf_ssb_rpc_tunnel_is_room(tf_ssb_connection_t* connection, uint8_t JS_FreeValue(context, response); } +typedef struct _blobs_get_t +{ + char id[k_blob_id_len]; + size_t received; + size_t expected_size; + char buffer[]; +} blobs_get_t; + +static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + blobs_get_t* get = user_data; + if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary && size > 0 && get->received + size <= get->expected_size) + { + memcpy(get->buffer + get->received, message, size); + get->received += size; + } + else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json) + { + bool stored = false; + if (JS_ToBool(context, args)) + { + char id[256]; + stored = tf_ssb_db_blob_store(ssb, (uint8_t*)get->buffer, get->expected_size, id, sizeof(id), NULL); + } + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, + -request_number, + (const uint8_t*)(stored ? "true" : "false"), + strlen(stored ? "true" : "false"), + NULL, + NULL, + NULL); + } +} + +static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data) +{ + tf_free(user_data); +} + +static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) +{ + blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); + *get = (blobs_get_t) { .expected_size = size }; + snprintf(get->id, sizeof(get->id), "%s", blob_id); + memset(get->buffer, 0, size); + + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue message = JS_NewObject(context); + JSValue name = JS_NewArray(context); + JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); + JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "get")); + JS_SetPropertyStr(context, message, "name", name); + JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); + JSValue args = JS_NewArray(context); + JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id)); + JS_SetPropertyStr(context, message, "args", args); + JSValue message_val = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t message_size; + const char* message_str = JS_ToCStringLen(context, &message_size, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + tf_ssb_connection_next_request_number(connection), + (const uint8_t*)message_str, + message_size, + _tf_ssb_rpc_connection_blobs_get_callback, + _tf_ssb_rpc_connection_blobs_get_cleanup, + get); + + JS_FreeCString(context, message_str); + JS_FreeValue(context, message); +} + +static void _tf_ssb_rpc_connection_blobs_createWants_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); + if (!blob_wants) + { + return; + } + + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_connection_get_context(connection); + + JSValue name = JS_GetPropertyStr(context, args, "name"); + if (!JS_IsUndefined(name)) + { + /* { name: "Error" } */ + JS_FreeValue(context, name); + return; + } + + JSPropertyEnum* ptab; + uint32_t plen; + JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK); + for (uint32_t i = 0; i < plen; ++i) + { + JSValue key = JS_AtomToString(context, ptab[i].atom); + JSPropertyDescriptor desc; + JSValue key_value = JS_NULL; + if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) + { + key_value = desc.value; + JS_FreeValue(context, desc.setter); + JS_FreeValue(context, desc.getter); + } + const char* blob_id = JS_ToCString(context, key); + int64_t size = 0; + JS_ToInt64(context, &size, key_value); + if (--blob_wants->wants_sent == 0) + { + _tf_ssb_rpc_request_more_blobs(connection); + } + if (size < 0) + { + size_t blob_size = 0; + if (tf_ssb_db_blob_get(ssb, blob_id, NULL, &blob_size)) + { + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, blob_size)); + JSValue message_val = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t message_size; + const char* message_str = JS_ToCStringLen(context, &message_size, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + -blob_wants->request_number, + (const uint8_t*)message_str, + message_size, + NULL, + NULL, + NULL); + + JS_FreeCString(context, message_str); + JS_FreeValue(context, message); + } + else if (size == -1LL) + { + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, -2)); + JSValue message_val = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t message_size; + const char* message_str = JS_ToCStringLen(context, &message_size, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + -blob_wants->request_number, + (const uint8_t*)message_str, + message_size, + NULL, + NULL, + NULL); + + JS_FreeCString(context, message_str); + JS_FreeValue(context, message); + } + } + else + { + _tf_ssb_rpc_connection_blobs_get(connection, blob_id, size); + } + JS_FreeCString(context, blob_id); + JS_FreeValue(context, key); + JS_FreeValue(context, key_value); + } + for (uint32_t i = 0; i < plen; ++i) + { + JS_FreeAtom(context, ptab[i].atom); + } + js_free(context, ptab); +} + +static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) +{ + if (change == k_tf_ssb_change_connect) + { + JSContext* context = tf_ssb_get_context(ssb); + JSValue message = JS_NewObject(context); + JSValue name = JS_NewArray(context); + JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); + JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "createWants")); + JS_SetPropertyStr(context, message, "name", name); + JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); + JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); + JSValue message_val = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t size; + const char* message_str = JS_ToCStringLen(context, &size, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + tf_ssb_connection_next_request_number(connection), + (const uint8_t*)message_str, + size, + _tf_ssb_rpc_connection_blobs_createWants_callback, + NULL, + NULL); + + JS_FreeCString(context, message_str); + JS_FreeValue(context, message_val); + JS_FreeValue(context, message); + } + else if (change == k_tf_ssb_change_remove) + { + tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, connection); + } +} + void tf_ssb_rpc_register(tf_ssb_t* ssb) { + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "gossip", "ping", NULL }, _tf_ssb_rpc_gossip_ping, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blobs_get, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blobs_has, NULL, NULL); + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_tunnel_is_room, NULL, NULL); }