"use strict"; var g_wants_requests = {}; var g_database = new Database('core'); let g_attendants = {}; const k_use_create_history_stream = false; const k_blobs_concurrent_target = 8; function get_latest_sequence_for_author(author) { var sequence = 0; ssb.sqlStream( 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', [author], function(row) { if (row.sequence) { sequence = row.sequence; } }); return sequence; } function storeMessage(message) { var payload = message.message.value ? message.message.value : message.message; if (typeof(payload) == 'object') { ssb.storeMessage(payload); } } function tunnel_attendants(request) { if (request.message.type !== 'state') { throw Error('Unexpected type: ' + request.message.type); } let state = new Set(request.message.ids); for (let id of state) { request.add_room_attendant(id); } request.more(function attendants(message) { if (message.message.type === 'joined') { request.add_room_attendant(message.message.id); state.add(message.message.id); } else if (message.message.type === 'left') { request.remove_room_attendant(message.message.id); state.delete(message.message.id); } else { throw Error('Unexpected type: ' + message.type); } }); } function get_more_blobs(connection) { while (Object.keys(connection.active_blob_gets).length < k_blobs_concurrent_target) { let next = Object.keys(connection.blob_get_queue).pop(); let expected_bytes = connection.blob_get_queue[next]; if (!next) { break; } delete connection.blob_get_queue[next]; connection.active_blob_gets[next] = true; let received_bytes = 0; let buffer = new Uint8Array(expected_bytes); connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [next]}, function(message) { if (message.flags & 0x4 /* end */) { delete connection.active_blob_gets[next]; } else { buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); received_bytes += message.message.byteLength; if (received_bytes == expected_bytes) { ssb.blobStore(buffer); delete connection.active_blob_gets[next]; get_more_blobs(connection); } } }); } } function send_blobs_create_wants(connection) { connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) { if (message.message?.name === 'Error') { return; } Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { if (g_wants_requests[connection.id]) { let blob = ssb.blobGet(id); if (blob) { let out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } } else { let expected_bytes = message.message[id]; connection.blob_get_queue[id] = expected_bytes; get_more_blobs(connection); } }); }); } ssb.addEventListener('connections', function on_connections_changed(change, connection) { if (change == 'add') { connection.active_blob_gets = {}; connection.blob_get_queue = {}; var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); var identities = ssb.getAllIdentities(); ssb.followingDeep(identities, 2).then(function(ids) { for (let id of ids) { if (identities.indexOf(id) != -1) { continue; } var sequence = get_latest_sequence_for_author(id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); } }); } else { if (connection.is_client) { connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); connection.send_json_async({'name': ['tunnel', 'isRoom'], 'args': []}, function tunnel_is_room(request) { if (request.message) { request.connection.send_json({'name': ['room', 'attendants'], 'args': [], 'type': 'source'}, tunnel_attendants); } }); } send_blobs_create_wants(connection); } } else if (change == 'remove') { print('REMOVE', connection.id); notify_attendant_changed(connection.id, 'left'); delete g_attendants[connection.id]; delete g_wants_requests[connection.id]; } else { print('CHANGE', change); } }); function blob_want_discovered(request, id) { if (!request || !request.connection) { return; } var message = {}; message[id] = -1; request.send_json(message); } function sleep(ms) { return new Promise(function(resolve, reject) { setTimeout(x => resolve(), ms); }); } async function requestMoreBlobs(request) { let last = ''; while (true) { await ssb.sqlStream( 'SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32', [last], function(row) { blob_want_discovered(request, row.id); last = row.id; }); await sleep(1000); } } ssb.addRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id)); requestMoreBlobs(request); }); function notify_attendant_changed(id, type) { if (!id) { print(`notify_attendant_changed called with id=${id}`); return; } for (let r of Object.values(g_attendants)) { try { r.send_json({ type: type, id: id, }); } catch (e) { print(`Removing ${id} from g_attendants in ${type}.`, e); delete g_attendants[id]; } } } ssb.addRpc(['room', 'attendants'], function(request) { let ids = Object.keys(g_attendants).sort(); request.send_json({ type: 'state', ids: ids, }); notify_attendant_changed(request.connection.id, 'joined'); g_attendants[request.connection.id] = request; }); function ebtReplicateSendClock(request, have) { var identities = ssb.getAllIdentities(); var message = {}; var last_sent = request.connection.sent_clock || {}; var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]); if (!Object.keys(last_sent).length) { for (let id of ids) { message[id] = get_latest_sequence_for_author(id); } } for (let id of Object.keys(have)) { if (message[id] === undefined) { var sequence = get_latest_sequence_for_author(id); message[id] = sequence ? sequence : -1; } } var to_send = {} var offset = Math.floor(Math.random() * ids.length); for (var i = 0; i < ids.length; i++) { var id = ids[(i + offset) % ids.length]; if (last_sent[id] === undefined || message[id] > last_sent[id]) { last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; } if (Object.keys(to_send).length >= 32) { request.send_json(to_send); to_send = {}; } } request.connection.sent_clock = last_sent; if (Object.keys(to_send).length) { request.send_json(to_send); } } function formatMessage(row) { if (row.sequence_before_author) { return { previous: row.previous, sequence: row.sequence, author: row.author, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } else { return { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } } function ebtReplicateRegisterMessageCallback(request) { ssb.addEventListener('message', function(message_id) { if (request.connection.send_clock) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', [message_id], function (row) { if (request.connection.send_clock[row.author] < row.sequence) { request.send_json(formatMessage(row)); } }); } }); } function ebtReplicateCommon(request) { if (request.message.author) { storeMessage(request); } else { ebtReplicateSendClock(request, request.message); if (!request.connection.send_clock) { request.connection.send_clock = {}; } for (let id of Object.keys(request.message)) { if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { request.connection.send_clock[id] = request.message[id] >> 1; ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, request.message[id] >> 1], function (row) { request.send_json(formatMessage(row)); request.connection.send_clock[id] = row.sequence; }); } else { delete request.connection.send_clock[id]; } } } } function ebtReplicateClient(request) { if (request.message?.name !== 'Error') { if (!request.connection.message_registered) { ebtReplicateRegisterMessageCallback(request); request.connection.message_registered = true; } ebtReplicateCommon(request); } } function ebtReplicateServer(request) { ebtReplicateRegisterMessageCallback(request); ebtReplicateSendClock(request, {}); request.more(ebtReplicateCommon); } ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; function sendMessage(row) { if (keys) { var message = { key: row.id, value: formatMessage(row), timestamp: row.timestamp, }; } else { var message = formatMessage(row); } request.send_json(message); } ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, seq ?? 0], sendMessage); ssb.addEventListener('message', function(message_id) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2', [message_id, id], function (row) { sendMessage(row); }); }); });