var g_wants_requests = {}; var g_database = new Database('core'); async function following(db, id) { var o = await db.get(id + ":following"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); await ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ " json_extract(content, '$.following') AS following "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'contact' "+ "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.following) { f.users.add(row.contact); } else { f.users.delete(row.contact); } f.sequence = row.sequence; }); f.users = Array.from(f.users); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":following", j); } return f.users; } async function followingDeep(db, seed_ids, depth) { if (depth <= 0) { return seed_ids; } var f = await Promise.all(seed_ids.map(x => following(db, x))); var ids = [].concat(...f); var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1); x = [...new Set([].concat(...x, ...seed_ids))].sort(); return x; } function get_latest_sequence_for_author(author) { var sequence = 0; ssb.sqlStream( 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', [author], function(row) { if (row.sequence) { sequence = row.sequence + 1; } }); return sequence; } ssb.addEventListener('connections', function(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, keys: false}]}, function(message) { ssb.storeMessage(message.message.value ? message.message.value : message.message); }); connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { var blob = ssb.blobGet(id); if (blob) { var out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } else { print("blobs.get", id); var received_bytes = 0; var expected_bytes = message.message[id]; var buffer = new Uint8Array(expected_bytes); connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) { buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); received_bytes += message.message.byteLength; if (received_bytes == expected_bytes) { ssb.blobStore(buffer); } }); } }); }); followingDeep(g_database, [ssb.whoami()], 2).then(function(ids) { for (let id of ids) { var sequence = get_latest_sequence_for_author(id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, function(message) { ssb.storeMessage(message.message.value ? message.message.value : message.message); }); } }); } else if (change == 'remove') { print('REMOVE', connection.id); delete g_wants_requests[connection.id]; } else { print('CHANGE', change); } }); ssb.addRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; function blob_want_discovered(id) { var message = {}; message[id] = -1; request.send_json(message); } ssb.addEventListener('blob_want_added', blob_want_discovered); ssb.sqlStream( 'SELECT id FROM blob_wants', [], row => blob_want_discovered(row.id)); }); ssb.addRpc(['blobs', 'has'], function(request) { var found = false; ssb.sqlStream( 'SELECT 1 FROM blobs where id = ?1', [request.args[0]], function(row) { found = true; }); request.send_json(found); }); ssb.addRpc(['blobs', 'get'], function(request) { for (let id of request.args) { var blob = ssb.blobGet(id); request.send_binary(blob); } }); ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; function sendMessage(row) { if (keys) { var message = { key: row.id, value: { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }, timestamp: row.timestamp, }; } else { var message = { previous: row.previous, author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, }; } request.send_json(message); } ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, seq ?? 0], sendMessage); ssb.addEventListener('message', function(id) { ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', [id], sendMessage); }); });