diff --git a/core/ssb.js b/core/ssb.js index 05c67cc6..9a4bc312 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -46,6 +46,34 @@ function tunnel_attendants(request) { }); } +function get_more_blobs(connection) { + while (Object.keys(connection.active_blob_gets).length < k_blobs_concurrent_target) { + let next = Object.keys(connection.blob_get_queue).pop(); + let expected_bytes = connection.blob_get_queue[next]; + if (!next) { + break; + } + delete connection.blob_get_queue[next]; + connection.active_blob_gets[next] = true; + + let received_bytes = 0; + let buffer = new Uint8Array(expected_bytes); + connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [next]}, function(message) { + if (message.flags & 0x4 /* end */) { + delete connection.active_blob_gets[next]; + } else { + buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); + received_bytes += message.message.byteLength; + if (received_bytes == expected_bytes) { + ssb.blobStore(buffer); + delete connection.active_blob_gets[next]; + get_more_blobs(connection); + } + } + }); + } +} + function send_blobs_create_wants(connection) { connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) { if (message.message?.name === 'Error') { @@ -54,32 +82,17 @@ function send_blobs_create_wants(connection) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { if (g_wants_requests[connection.id]) { - delete connection.active_blob_wants[id]; - var blob = ssb.blobGet(id); + let blob = ssb.blobGet(id); if (blob) { - var out_message = {}; + let out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } } else { - var received_bytes = 0; - var expected_bytes = message.message[id]; - var buffer = new Uint8Array(expected_bytes); - connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) { - if (message.flags & 0x4 /* end */) { - delete connection.active_blob_wants[id]; - } else { - buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); - received_bytes += message.message.byteLength; - if (received_bytes == expected_bytes) { - ssb.blobStore(buffer); - } - } - }); - if (g_wants_requests[connection.id] && Object.keys(connection.active_blob_wants).length < k_blobs_concurrent_target) { - requestMoreBlobs(g_wants_requests[connection.id]); - } + let expected_bytes = message.message[id]; + connection.blob_get_queue[id] = expected_bytes; + get_more_blobs(connection); } }); }); @@ -87,7 +100,8 @@ function send_blobs_create_wants(connection) { ssb.addEventListener('connections', function on_connections_changed(change, connection) { if (change == 'add') { - connection.active_blob_wants = {}; + connection.active_blob_gets = {}; + connection.blob_get_queue = {}; var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); @@ -124,20 +138,32 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn }); function blob_want_discovered(request, id) { - if (!request || !request.connection || Object.keys(request.connection.active_blob_wants).length > k_blobs_concurrent_target) { + if (!request || !request.connection) { return; } var message = {}; message[id] = -1; request.send_json(message); - request.connection.active_blob_wants[id] = true; } -function requestMoreBlobs(request) { - ssb.sqlStream( - 'SELECT id FROM blob_wants LIMIT ' + k_blobs_concurrent_target, - [], - row => blob_want_discovered(request, row.id)); +function sleep(ms) { + return new Promise(function(resolve, reject) { + setTimeout(x => resolve(), ms); + }); +} + +async function requestMoreBlobs(request) { + let last = ''; + while (true) { + await ssb.sqlStream( + 'SELECT id FROM blob_wants WHERE id >= ? ORDER BY id LIMIT 32', + [last], + function(row) { + blob_want_discovered(request, row.id); + last = row.id; + }); + await sleep(1000); + } } ssb.addRpc(['blobs', 'createWants'], function(request) { diff --git a/src/ssb.c b/src/ssb.c index 269ab7ae..27644720 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -1182,7 +1182,6 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t { if (_tf_ssb_name_equals(context, val, it->name)) { - printf("called it\n"); it->callback(connection, flags, request_number, val, NULL, 0, it->user_data); found = true; break;