forked from cory/tildefriends
Cory McWilliams
03a2367532
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3680 ed5197a5-7fde-0310-b194-c3ffbd925b24
179 lines
5.0 KiB
JavaScript
179 lines
5.0 KiB
JavaScript
var g_wants_requests = {};
|
|
|
|
async function following(db, id) {
|
|
var o = await db.get(id + ":following");
|
|
const k_version = 5;
|
|
var f = o ? JSON.parse(o) : o;
|
|
if (!f || f.version != k_version) {
|
|
f = {users: [], sequence: 0, version: k_version};
|
|
}
|
|
f.users = new Set(f.users);
|
|
await ssb.sqlStream(
|
|
"SELECT "+
|
|
" sequence, "+
|
|
" json_extract(content, '$.contact') AS contact, "+
|
|
" json_extract(content, '$.following') AS following "+
|
|
"FROM messages "+
|
|
"WHERE "+
|
|
" author = ?1 AND "+
|
|
" sequence > ?2 AND "+
|
|
" json_extract(content, '$.type') = 'contact' "+
|
|
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+
|
|
"ORDER BY sequence",
|
|
[id, f.sequence],
|
|
function(row) {
|
|
if (row.following) {
|
|
f.users.add(row.contact);
|
|
} else {
|
|
f.users.delete(row.contact);
|
|
}
|
|
f.sequence = row.sequence;
|
|
});
|
|
f.users = Array.from(f.users);
|
|
var j = JSON.stringify(f);
|
|
if (o != j) {
|
|
await db.set(id + ":following", j);
|
|
}
|
|
return f.users;
|
|
}
|
|
|
|
async function followingDeep(db, seed_ids, depth) {
|
|
if (depth <= 0) {
|
|
return seed_ids;
|
|
}
|
|
var f = await Promise.all(seed_ids.map(x => following(db, x)));
|
|
var ids = [].concat(...f);
|
|
var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1);
|
|
x = [].concat(...x, ...seed_ids);
|
|
return x;
|
|
}
|
|
|
|
var g_database = new Database('core');
|
|
|
|
async function test_following() {
|
|
try {
|
|
debug_print("I AM", await ssb.whoami());
|
|
var result = await followingDeep(g_database, [await ssb.whoami()], 1);
|
|
debug_print("following ", JSON.stringify(result));
|
|
} catch (e) {
|
|
debug_print("DOH", e, e.stack);
|
|
}
|
|
}
|
|
|
|
test_following();
|
|
|
|
function get_latest_sequence_for_author(author) {
|
|
var sequence = 0;
|
|
ssb.sqlStream(
|
|
'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
|
|
[author],
|
|
function(row) {
|
|
if (row.sequence) {
|
|
sequence = row.sequence + 1;
|
|
}
|
|
});
|
|
return sequence;
|
|
}
|
|
|
|
ssb.registerConnectionsChanged(function(change, connection) {
|
|
if (change == 'add') {
|
|
var sequence = get_latest_sequence_for_author(connection.id);
|
|
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence}]}, function(message) {
|
|
ssb.storeMessage(message.message.value);
|
|
});
|
|
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
|
|
Object.keys(message.message).forEach(function(id) {
|
|
if (message.message[id] < 0) {
|
|
var blob = ssb.blobGet(id);
|
|
if (blob) {
|
|
var out_message = {};
|
|
out_message[id] = blob.byteLength;
|
|
g_wants_requests[connection.id].send_json(out_message);
|
|
}
|
|
} else {
|
|
debug_print("blobs.get", id);
|
|
var received_bytes = 0;
|
|
var expected_bytes = message.message[id];
|
|
var buffer = new Uint8Array(expected_bytes);
|
|
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) {
|
|
buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes);
|
|
received_bytes += message.message.byteLength;
|
|
if (received_bytes == expected_bytes) {
|
|
ssb.blobStore(buffer);
|
|
}
|
|
});
|
|
}
|
|
});
|
|
});
|
|
followingDeep(g_database, [ssb.whoami()], 2).then(function(ids) {
|
|
for (let id of ids) {
|
|
var sequence = get_latest_sequence_for_author(id);
|
|
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence}]}, function(message) {
|
|
ssb.storeMessage(message.message.value);
|
|
});
|
|
}
|
|
});
|
|
} else if (change == 'remove') {
|
|
debug_print('REMOVE', connection.id);
|
|
delete g_wants_requests[connection.id];
|
|
} else {
|
|
debug_print('CHANGE', change);
|
|
}
|
|
});
|
|
|
|
ssb.registerRpc(['blobs', 'createWants'], function(request) {
|
|
g_wants_requests[request.connection.id] = request;
|
|
function blob_want_discovered(id) {
|
|
var message = {};
|
|
message[id] = -1;
|
|
request.send_json(message);
|
|
}
|
|
ssb.registerBlobWantAdded(blob_want_discovered);
|
|
ssb.sqlStream(
|
|
'SELECT id FROM blob_wants',
|
|
[],
|
|
row => blob_want_discovered(row.id));
|
|
});
|
|
|
|
ssb.registerRpc(['blobs', 'has'], function(request) {
|
|
var found = false;
|
|
ssb.sqlStream(
|
|
'SELECT 1 FROM blobs where id = ?1',
|
|
[request.args[0]],
|
|
function(row) {
|
|
found = true;
|
|
});
|
|
request.send_json(found);
|
|
});
|
|
|
|
ssb.registerRpc(['blobs', 'get'], function(request) {
|
|
for (let id of request.args) {
|
|
var blob = ssb.blobGet(id);
|
|
request.send_binary(blob);
|
|
}
|
|
});
|
|
|
|
ssb.registerRpc(['createHistoryStream'], function(request) {
|
|
var id = request.args[0].id;
|
|
var seq = request.args[0].seq;
|
|
ssb.sqlStream(
|
|
'SELECT previous, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
|
|
[id, seq ?? 0],
|
|
function(row) {
|
|
var message = {
|
|
key: row.id + '.' + row.hash,
|
|
value: {
|
|
previous: row.previous,
|
|
author: id,
|
|
sequence: row.sequence,
|
|
timestamp: row.timestamp,
|
|
hash: row.hash,
|
|
content: JSON.parse(row.content),
|
|
signature: row.signature,
|
|
},
|
|
timestamp: row.timestamp,
|
|
};
|
|
request.send_json(message);
|
|
});
|
|
});
|