"use strict"; var g_wants_requests = {}; var g_database = new Database('core'); const k_use_create_history_stream = false; function get_latest_sequence_for_author(author) { var sequence = 0; ssb.sqlStream( 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', [author], function(row) { if (row.sequence) { sequence = row.sequence; } }); return sequence; } function storeMessage(message) { var payload = message.message.value ? message.message.value : message.message; if (typeof(payload) == 'object') { ssb.storeMessage(payload); } } ssb.addEventListener('connections', function on_connections_changed(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); var identities = ssb.getAllIdentities(); ssb.followingDeep(identities, 2).then(function(ids) { for (let id of ids) { if (identities.indexOf(id) != -1) { continue; } var sequence = get_latest_sequence_for_author(id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); } }); } else { if (connection.is_client) { connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); } } } else if (change == 'remove') { print('REMOVE', connection.id); delete g_wants_requests[connection.id]; } else { print('CHANGE', change); } }); function ebtReplicateSendClock(request, have) { var identities = ssb.getAllIdentities(); var message = {}; var last_sent = request.connection.sent_clock || {}; var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]); if (!Object.keys(last_sent).length) { for (let id of ids) { message[id] = get_latest_sequence_for_author(id); } } for (let id of Object.keys(have)) { if (message[id] === undefined) { var sequence = get_latest_sequence_for_author(id); message[id] = sequence ? sequence : -1; } } var to_send = {} var offset = Math.floor(Math.random() * ids.length); for (var i = 0; i < ids.length; i++) { var id = ids[(i + offset) % ids.length]; if (last_sent[id] === undefined || message[id] > last_sent[id]) { last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; } if (Object.keys(to_send).length >= 32) { request.send_json(to_send); to_send = {}; } } request.connection.sent_clock = last_sent; if (Object.keys(to_send).length) { request.send_json(to_send); } } function formatMessage(row) { if (row.sequence_before_author) { return { previous: row.previous, sequence: row.sequence, author: row.author, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } else { return { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } } function ebtReplicateRegisterMessageCallback(request) { ssb.addEventListener('message', function(message_id) { if (request.connection.send_clock) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', [message_id], function (row) { if (request.connection.send_clock[row.author] < row.sequence) { request.send_json(formatMessage(row)); } }); } }); } function ebtReplicateCommon(request) { if (request.message.author) { storeMessage(request); } else { ebtReplicateSendClock(request, request.message); if (!request.connection.send_clock) { request.connection.send_clock = {}; } for (let id of Object.keys(request.message)) { if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { request.connection.send_clock[id] = request.message[id] >> 1; ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, request.message[id] >> 1], function (row) { request.send_json(formatMessage(row)); request.connection.send_clock[id] = row.sequence; }); } else { delete request.connection.send_clock[id]; } } } } function ebtReplicateClient(request) { if (request.message?.name !== 'Error') { if (!request.connection.message_registered) { ebtReplicateRegisterMessageCallback(request); request.connection.message_registered = true; } ebtReplicateCommon(request); } } function ebtReplicateServer(request) { ebtReplicateRegisterMessageCallback(request); ebtReplicateSendClock(request, {}); request.more(ebtReplicateCommon); } ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); ssb.addRpc(['createHistoryStream'], function(request) { if (!request?.args) { print('missing request.args in createHistoryStream'); } var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; function sendMessage(row) { if (keys) { var message = { key: row.id, value: formatMessage(row), timestamp: row.timestamp, }; } else { var message = formatMessage(row); } request.send_json(message); } ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, seq ?? 0], sendMessage); ssb.addEventListener('message', function(message_id) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2', [message_id, id], function (row) { sendMessage(row); }); }); });