I think this fixes some blob replication bugs. Going to test more.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4099 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-01-02 00:33:11 +00:00
parent 5342ddb2bd
commit f257cccded
2 changed files with 54 additions and 29 deletions

View File

@ -46,6 +46,34 @@ function tunnel_attendants(request) {
}); });
} }
function get_more_blobs(connection) {
while (Object.keys(connection.active_blob_gets).length < k_blobs_concurrent_target) {
let next = Object.keys(connection.blob_get_queue).pop();
let expected_bytes = connection.blob_get_queue[next];
if (!next) {
break;
}
delete connection.blob_get_queue[next];
connection.active_blob_gets[next] = true;
let received_bytes = 0;
let buffer = new Uint8Array(expected_bytes);
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [next]}, function(message) {
if (message.flags & 0x4 /* end */) {
delete connection.active_blob_gets[next];
} else {
buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes);
received_bytes += message.message.byteLength;
if (received_bytes == expected_bytes) {
ssb.blobStore(buffer);
delete connection.active_blob_gets[next];
get_more_blobs(connection);
}
}
});
}
}
function send_blobs_create_wants(connection) { function send_blobs_create_wants(connection) {
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) { connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) {
if (message.message?.name === 'Error') { if (message.message?.name === 'Error') {
@ -54,32 +82,17 @@ function send_blobs_create_wants(connection) {
Object.keys(message.message).forEach(function(id) { Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) { if (message.message[id] < 0) {
if (g_wants_requests[connection.id]) { if (g_wants_requests[connection.id]) {
delete connection.active_blob_wants[id]; let blob = ssb.blobGet(id);
var blob = ssb.blobGet(id);
if (blob) { if (blob) {
var out_message = {}; let out_message = {};
out_message[id] = blob.byteLength; out_message[id] = blob.byteLength;
g_wants_requests[connection.id].send_json(out_message); g_wants_requests[connection.id].send_json(out_message);
} }
} }
} else { } else {
var received_bytes = 0; let expected_bytes = message.message[id];
var expected_bytes = message.message[id]; connection.blob_get_queue[id] = expected_bytes;
var buffer = new Uint8Array(expected_bytes); get_more_blobs(connection);
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) {
if (message.flags & 0x4 /* end */) {
delete connection.active_blob_wants[id];
} else {
buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes);
received_bytes += message.message.byteLength;
if (received_bytes == expected_bytes) {
ssb.blobStore(buffer);
}
}
});
if (g_wants_requests[connection.id] && Object.keys(connection.active_blob_wants).length < k_blobs_concurrent_target) {
requestMoreBlobs(g_wants_requests[connection.id]);
}
} }
}); });
}); });
@ -87,7 +100,8 @@ function send_blobs_create_wants(connection) {
ssb.addEventListener('connections', function on_connections_changed(change, connection) { ssb.addEventListener('connections', function on_connections_changed(change, connection) {
if (change == 'add') { if (change == 'add') {
connection.active_blob_wants = {}; connection.active_blob_gets = {};
connection.blob_get_queue = {};
var sequence = get_latest_sequence_for_author(connection.id); var sequence = get_latest_sequence_for_author(connection.id);
if (k_use_create_history_stream) { if (k_use_create_history_stream) {
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
@ -124,20 +138,32 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn
}); });
function blob_want_discovered(request, id) { function blob_want_discovered(request, id) {
if (!request || !request.connection || Object.keys(request.connection.active_blob_wants).length > k_blobs_concurrent_target) { if (!request || !request.connection) {
return; return;
} }
var message = {}; var message = {};
message[id] = -1; message[id] = -1;
request.send_json(message); request.send_json(message);
request.connection.active_blob_wants[id] = true;
} }
function requestMoreBlobs(request) { function sleep(ms) {
ssb.sqlStream( return new Promise(function(resolve, reject) {
'SELECT id FROM blob_wants LIMIT ' + k_blobs_concurrent_target, setTimeout(x => resolve(), ms);
[], });
row => blob_want_discovered(request, row.id)); }
async function requestMoreBlobs(request) {
let last = '';
while (true) {
await ssb.sqlStream(
'SELECT id FROM blob_wants WHERE id >= ? ORDER BY id LIMIT 32',
[last],
function(row) {
blob_want_discovered(request, row.id);
last = row.id;
});
await sleep(1000);
}
} }
ssb.addRpc(['blobs', 'createWants'], function(request) { ssb.addRpc(['blobs', 'createWants'], function(request) {

View File

@ -1182,7 +1182,6 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
{ {
if (_tf_ssb_name_equals(context, val, it->name)) if (_tf_ssb_name_equals(context, val, it->name))
{ {
printf("called it\n");
it->callback(connection, flags, request_number, val, NULL, 0, it->user_data); it->callback(connection, flags, request_number, val, NULL, 0, it->user_data);
found = true; found = true;
break; break;