var g_wants_requests = {}; async function following(db, id) { var o = await db.get(id + ":following"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); await ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ " json_extract(content, '$.following') AS following "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'contact' "+ "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.following) { f.users.add(row.contact); } else { f.users.delete(row.contact); } f.sequence = row.sequence; }); f.users = Array.from(f.users); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":following", j); } return f.users; } async function followingDeep(db, seed_ids, depth) { if (depth <= 0) { return seed_ids; } var f = await Promise.all(seed_ids.map(x => following(db, x))); var ids = [].concat(...f); var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1); x = [].concat(...x, ...seed_ids); return x; } var g_database = new Database('core'); async function test_following() { try { debug_print("I AM", await ssb.whoami()); var result = await followingDeep(g_database, [await ssb.whoami()], 1); debug_print("following ", JSON.stringify(result)); } catch (e) { debug_print("DOH", e, e.stack); } } test_following(); ssb.registerConnectionsChanged(function(change, connection) { if (change == 'add') { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': 0}]}, function(message) { ssb.storeMessage(message.message); }); connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { var blob = ssb.blobGet(id); if (blob) { var out_message = {}; out_message[id] = blob.byteLength; g_wants_requests[connection.id].send_json(out_message); } } else { debug_print("blobs.get", id); connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [{'id': id}]}, function(message) { debug_print(id, '=>', debug_utf8Decode(message.message)); ssb.blobStore(message.message); }); } }); }); } else if (change == 'remove') { debug_print('REMOVE', connection.id); delete g_wants_requests[connection.id]; } else { debug_print('CHANGE', change); } }); ssb.registerRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; function blob_want_discovered(id) { debug_print('discovered', id); var message = {}; message[id] = -1; request.send_json(message); } ssb.registerBlobWantAdded(blob_want_discovered); ssb.sqlStream( 'SELECT id FROM blob_wants', [], row => blob_want_discovered(row.id)); }); ssb.registerRpc(['blobs', 'has'], function(request) { var found = false; ssb.sqlStream( 'SELECT 1 FROM blobs where id = ?1', [request.args[0]], function(row) { found = true; }); request.send_json(found); }); ssb.registerRpc(['blobs', 'get'], function(request) { var blob = ssb.blobGet(request.args[0].id); request.send_binary(blob); }); ssb.registerRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; ssb.sqlStream( 'SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, seq ?? 0], function(row) { var message = { 'previous': row.previous, 'author': id, 'sequence': row.sequence, 'timestamp': row.timestamp, 'hash': row.hash, 'content': JSON.parse(row.content), 'signature': row.signature, }; debug_print('sending1', JSON.stringify(message)); request.send_json(message); }); });