diff --git a/core/ssb.js b/core/ssb.js index 0eaaa3d3..576256fd 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -2,6 +2,8 @@ var g_wants_requests = {}; var g_database = new Database('core'); let g_attendants = {}; +let g_blob_wants_sent = 0; +let g_blob_last_requested = ''; const k_use_create_history_stream = false; const k_blobs_concurrent_target = 8; @@ -80,15 +82,25 @@ function send_blobs_create_wants(connection) { return; } Object.keys(message.message).forEach(function(id) { + g_blob_wants_sent--; + if (g_blob_wants_sent == 0) { + request_more_blobs(g_wants_requests[connection.id]); + } if (message.message[id] < 0) { + let blob; if (g_wants_requests[connection.id]) { - let blob = ssb.blobGet(id); + blob = ssb.blobGet(id); if (blob) { let out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } + if (!blob && message.message[id] == -1) { + let out_message = {}; + out_message[id] = -2; + g_wants_requests[connection.id].send_json(out_message); + } } else { let expected_bytes = message.message[id]; connection.blob_get_queue[id] = expected_bytes; @@ -143,33 +155,24 @@ function blob_want_discovered(request, id) { } var message = {}; message[id] = -1; + g_blob_wants_sent++; request.send_json(message); } -function sleep(ms) { - return new Promise(function(resolve, reject) { - setTimeout(x => resolve(), ms); - }); -} - -async function requestMoreBlobs(request) { - let last = ''; - while (true) { - await ssb.sqlStream( - 'SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32', - [last], - function(row) { - blob_want_discovered(request, row.id); - last = row.id; - }); - await sleep(1000); - } +function request_more_blobs(request) { + return ssb.sqlStream( + 'SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32', + [g_blob_last_requested], + function(row) { + blob_want_discovered(request, row.id); + g_blob_last_requested = row.id; + }); } ssb.addRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id)); - requestMoreBlobs(request); + request_more_blobs(request); }); function notify_attendant_changed(id, type) {