147 lines
4.0 KiB
JavaScript
Raw Normal View History

var g_wants_requests = {};
async function following(db, id) {
var o = await db.get(id + ":following");
const k_version = 5;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {users: [], sequence: 0, version: k_version};
}
f.users = new Set(f.users);
await ssb.sqlStream(
"SELECT "+
" sequence, "+
" json_extract(content, '$.contact') AS contact, "+
" json_extract(content, '$.following') AS following "+
"FROM messages "+
"WHERE "+
" author = ?1 AND "+
" sequence > ?2 AND "+
" json_extract(content, '$.type') = 'contact' "+
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+
"ORDER BY sequence",
[id, f.sequence],
function(row) {
if (row.following) {
f.users.add(row.contact);
} else {
f.users.delete(row.contact);
}
f.sequence = row.sequence;
});
f.users = Array.from(f.users);
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":following", j);
}
return f.users;
}
async function followingDeep(db, seed_ids, depth) {
if (depth <= 0) {
return seed_ids;
}
var f = await Promise.all(seed_ids.map(x => following(db, x)));
var ids = [].concat(...f);
var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1);
x = [].concat(...x, ...seed_ids);
return x;
}
var g_database = new Database('core');
async function test_following() {
try {
debug_print("I AM", await ssb.whoami());
var result = await followingDeep(g_database, [await ssb.whoami()], 1);
debug_print("following ", JSON.stringify(result));
} catch (e) {
debug_print("DOH", e, e.stack);
}
}
test_following();
ssb.registerConnectionsChanged(function(change, connection) {
if (change == 'add') {
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': 0}]}, function(message) {
ssb.storeMessage(message.message);
});
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) {
var blob = ssb.blobGet(id);
if (blob) {
var out_message = {};
out_message[id] = blob.byteLength;
g_wants_requests[connection.id].send_json(out_message);
}
} else {
debug_print("blobs.get", id);
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [{'id': id}]}, function(message) {
debug_print(id, '=>', debug_utf8Decode(message.message));
ssb.blobStore(message.message);
});
}
});
});
} else if (change == 'remove') {
debug_print('REMOVE', connection.id);
delete g_wants_requests[connection.id];
} else {
debug_print('CHANGE', change);
}
});
ssb.registerRpc(['blobs', 'createWants'], function(request) {
g_wants_requests[request.connection.id] = request;
function blob_want_discovered(id) {
debug_print('discovered', id);
var message = {};
message[id] = -1;
request.send_json(message);
}
ssb.registerBlobWantAdded(blob_want_discovered);
ssb.sqlStream(
'SELECT id FROM blob_wants',
[],
row => blob_want_discovered(row.id));
});
ssb.registerRpc(['blobs', 'has'], function(request) {
var found = false;
ssb.sqlStream(
'SELECT 1 FROM blobs where id = ?1',
[request.args[0]],
function(row) {
found = true;
});
request.send_json(found);
});
ssb.registerRpc(['blobs', 'get'], function(request) {
var blob = ssb.blobGet(request.args[0].id);
request.send_binary(blob);
});
ssb.registerRpc(['createHistoryStream'], function(request) {
var id = request.args[0].id;
var seq = request.args[0].seq;
ssb.sqlStream(
'SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, seq ?? 0],
function(row) {
var message = {
'previous': row.previous,
'author': id,
'sequence': row.sequence,
'timestamp': row.timestamp,
'hash': row.hash,
'content': JSON.parse(row.content),
'signature': row.signature,
};
debug_print('sending1', JSON.stringify(message));
request.send_json(message);
});
});