Some ebt.replicate success.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3703 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
2021-12-27 19:52:42 +00:00
parent efcada8e25
commit 3cddc524d1
3 changed files with 128 additions and 29 deletions

View File

@ -1,15 +1,16 @@
var g_wants_requests = {};
var g_database = new Database('core');
const k_use_create_history_stream = false;
async function following(db, id) {
var o = await db.get(id + ":following");
function following(db, id) {
var o = db.get(id + ":following");
const k_version = 5;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {users: [], sequence: 0, version: k_version};
}
f.users = new Set(f.users);
await ssb.sqlStream(
ssb.sqlStream(
"SELECT "+
" sequence, "+
" json_extract(content, '$.contact') AS contact, "+
@ -33,18 +34,18 @@ async function following(db, id) {
f.users = Array.from(f.users);
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":following", j);
db.set(id + ":following", j);
}
return f.users;
}
async function followingDeep(db, seed_ids, depth) {
function followingDeep(db, seed_ids, depth) {
if (depth <= 0) {
return seed_ids;
}
var f = await Promise.all(seed_ids.map(x => following(db, x)));
var f = seed_ids.map(x => following(db, x));
var ids = [].concat(...f);
var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1);
var x = followingDeep(db, [...new Set(ids)].sort(), depth - 1);
x = [...new Set([].concat(...x, ...seed_ids))].sort();
return x;
}
@ -56,7 +57,7 @@ function get_latest_sequence_for_author(author) {
[author],
function(row) {
if (row.sequence) {
sequence = row.sequence + 1;
sequence = row.sequence;
}
});
return sequence;
@ -72,7 +73,23 @@ function storeMessage(message) {
ssb.addEventListener('connections', function(change, connection) {
if (change == 'add') {
var sequence = get_latest_sequence_for_author(connection.id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
if (k_use_create_history_stream) {
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
var me = ssb.whoami();
followingDeep(g_database, [me], 2).then(function(ids) {
for (let id of ids) {
if (id == me) {
continue;
}
var sequence = get_latest_sequence_for_author(id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
}
});
} else {
if (connection.is_client) {
connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
}
}
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) {
@ -83,7 +100,6 @@ ssb.addEventListener('connections', function(change, connection) {
g_wants_requests[connection.id].send_json(out_message);
}
} else {
print("blobs.get", id);
var received_bytes = 0;
var expected_bytes = message.message[id];
var buffer = new Uint8Array(expected_bytes);
@ -97,16 +113,6 @@ ssb.addEventListener('connections', function(change, connection) {
}
});
});
var me = ssb.whoami();
followingDeep(g_database, [me], 2).then(function(ids) {
for (let id of ids) {
if (id == me) {
continue;
}
var sequence = get_latest_sequence_for_author(id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
}
});
} else if (change == 'remove') {
print('REMOVE', connection.id);
delete g_wants_requests[connection.id];
@ -149,10 +155,97 @@ ssb.addRpc(['blobs', 'get'], function(request) {
ssb.addRpc(['gossip', 'ping'], function(request) {
request.more(function(message) {
message.send_json(message.message);
message.send_json(Date.now());
});
});
ssb.addRpc(['tunnel', 'isRoom'], function(request) {
request.send_json(false);
});
function ebtReplicateSendClock(request, have) {
var me = ssb.whoami();
var message = {};
var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]).concat(Object.keys(have));
for (let id of ids) {
message[id] = get_latest_sequence_for_author(id);
}
var last_sent = request.connection.sent_clock || {};
var to_send = {}
for (let id of ids) {
if (last_sent[id] === undefined || message[id] > last_sent[id]) {
last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1;
}
}
request.connection.sent_clock = last_sent;
if (Object.keys(to_send).length) {
request.send_json(to_send);
}
}
function formatMessage(row) {
var message = {
previous: row.previous,
author: row.author,
sequence: row.sequence,
timestamp: row.timestamp,
hash: row.hash,
content: JSON.parse(row.content),
signature: row.signature,
};
return message;
}
function ebtReplicateRegisterMessageCallback(request) {
var me = ssb.whoami();
ssb.addEventListener('message', function(message_id) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2',
[message_id, me],
function (row) {
request.send_json(formatMessage(row));
});
});
}
function ebtReplicateCommon(request) {
var me = ssb.whoami();
if (request.message.author) {
storeMessage(request);
} else {
ebtReplicateSendClock(request, request.message);
for (let id of Object.keys(request.message)) {
if (request.message[id] >= 0 && (request.message[id] & 1) == 0) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, request.message[id] >> 1],
function (row) {
request.send_json(formatMessage(row));
});
}
}
}
}
function ebtReplicateClient(request) {
if (!request.connection.message_registered) {
ebtReplicateRegisterMessageCallback(request);
request.connection.message_registered = true;
}
ebtReplicateCommon(request);
}
function ebtReplicateServer(request) {
ebtReplicateRegisterMessageCallback(request);
ebtReplicateSendClock(request, {});
request.more(ebtReplicateCommon);
}
ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer);
ssb.addRpc(['createHistoryStream'], function(request) {
var id = request.args[0].id;
var seq = request.args[0].seq;
@ -191,12 +284,10 @@ ssb.addRpc(['createHistoryStream'], function(request) {
sendMessage);
ssb.addEventListener('message', function(message_id) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1',
[message_id],
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2',
[message_id, id],
function (row) {
if (row.author == id) {
sendMessage(row);
}
sendMessage(row);
});
});
});