var g_wants_requests = {}; var g_database = new Database('core'); const k_use_create_history_stream = false; function following(db, id) { var o = db.get(id + ":following"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ " json_extract(content, '$.following') AS following "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'contact' "+ "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.following) { f.users.add(row.contact); } else { f.users.delete(row.contact); } f.sequence = row.sequence; }); f.users = Array.from(f.users); var j = JSON.stringify(f); if (o != j) { db.set(id + ":following", j); } return f.users; } function followingDeep(db, seed_ids, depth) { if (depth <= 0) { return seed_ids; } var f = seed_ids.map(x => following(db, x)); var ids = [].concat(...f); var x = followingDeep(db, [...new Set(ids)].sort(), depth - 1); x = [...new Set([].concat(...x, ...seed_ids))].sort(); return x; } function get_latest_sequence_for_author(author) { var sequence = 0; ssb.sqlStream( 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', [author], function(row) { if (row.sequence) { sequence = row.sequence; } }); return sequence; } function storeMessage(message) { var payload = message.message.value ? message.message.value : message.message; if (typeof(payload) == 'object') { ssb.storeMessage(payload); } } ssb.addEventListener('connections', function(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); var me = ssb.whoami(); followingDeep(g_database, [me], 2).then(function(ids) { for (let id of ids) { if (id == me) { continue; } var sequence = get_latest_sequence_for_author(id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); } }); } else { if (connection.is_client) { connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); } } connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { var blob = ssb.blobGet(id); if (blob) { var out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } else { var received_bytes = 0; var expected_bytes = message.message[id]; var buffer = new Uint8Array(expected_bytes); connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) { buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); received_bytes += message.message.byteLength; if (received_bytes == expected_bytes) { ssb.blobStore(buffer); } }); } }); }); } else if (change == 'remove') { print('REMOVE', connection.id); delete g_wants_requests[connection.id]; } else { print('CHANGE', change); } }); ssb.addRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; function blob_want_discovered(id) { var message = {}; message[id] = -1; request.send_json(message); } ssb.addEventListener('blob_want_added', blob_want_discovered); ssb.sqlStream( 'SELECT id FROM blob_wants', [], row => blob_want_discovered(row.id)); }); ssb.addRpc(['blobs', 'has'], function(request) { var found = false; ssb.sqlStream( 'SELECT 1 FROM blobs where id = ?1', [request.args[0]], function(row) { found = true; }); request.send_json(found); }); ssb.addRpc(['blobs', 'get'], function(request) { for (let id of request.args) { var blob = ssb.blobGet(id); request.send_binary(blob); } }); ssb.addRpc(['gossip', 'ping'], function(request) { request.more(function(message) { message.send_json(Date.now()); }); }); ssb.addRpc(['tunnel', 'isRoom'], function(request) { request.send_json(false); }); function ebtReplicateSendClock(request, have) { var me = ssb.whoami(); var message = {}; var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]).concat(Object.keys(have)); for (let id of ids) { message[id] = get_latest_sequence_for_author(id); } var last_sent = request.connection.sent_clock || {}; var to_send = {} for (let id of ids) { if (last_sent[id] === undefined || message[id] > last_sent[id]) { last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; } } request.connection.sent_clock = last_sent; if (Object.keys(to_send).length) { request.send_json(to_send); } } function formatMessage(row) { var message = { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; return message; } function ebtReplicateRegisterMessageCallback(request) { var me = ssb.whoami(); ssb.addEventListener('message', function(message_id) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2', [message_id, me], function (row) { request.send_json(formatMessage(row)); }); }); } function ebtReplicateCommon(request) { var me = ssb.whoami(); if (request.message.author) { storeMessage(request); } else { ebtReplicateSendClock(request, request.message); for (let id of Object.keys(request.message)) { if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, request.message[id] >> 1], function (row) { request.send_json(formatMessage(row)); }); } } } } function ebtReplicateClient(request) { if (!request.connection.message_registered) { ebtReplicateRegisterMessageCallback(request); request.connection.message_registered = true; } ebtReplicateCommon(request); } function ebtReplicateServer(request) { ebtReplicateRegisterMessageCallback(request); ebtReplicateSendClock(request, {}); request.more(ebtReplicateCommon); } ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; function sendMessage(row) { if (keys) { var message = { key: row.id, value: { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }, timestamp: row.timestamp, }; } else { var message = { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } request.send_json(message); } ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, seq ?? 0], sendMessage); ssb.addEventListener('message', function(message_id) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2', [message_id, id], function (row) { sendMessage(row); }); }); });