Cory McWilliams
235fc9b8f9
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4100 ed5197a5-7fde-0310-b194-c3ffbd925b24
351 lines
10 KiB
JavaScript
351 lines
10 KiB
JavaScript
"use strict";
|
|
var g_wants_requests = {};
|
|
var g_database = new Database('core');
|
|
let g_attendants = {};
|
|
const k_use_create_history_stream = false;
|
|
const k_blobs_concurrent_target = 8;
|
|
|
|
function get_latest_sequence_for_author(author) {
|
|
var sequence = 0;
|
|
ssb.sqlStream(
|
|
'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
|
|
[author],
|
|
function(row) {
|
|
if (row.sequence) {
|
|
sequence = row.sequence;
|
|
}
|
|
});
|
|
return sequence;
|
|
}
|
|
|
|
function storeMessage(message) {
|
|
var payload = message.message.value ? message.message.value : message.message;
|
|
if (typeof(payload) == 'object') {
|
|
ssb.storeMessage(payload);
|
|
}
|
|
}
|
|
|
|
function tunnel_attendants(request) {
|
|
if (request.message.type !== 'state') {
|
|
throw Error('Unexpected type: ' + request.message.type);
|
|
}
|
|
let state = new Set(request.message.ids);
|
|
for (let id of state) {
|
|
request.add_room_attendant(id);
|
|
}
|
|
request.more(function attendants(message) {
|
|
if (message.message.type === 'joined') {
|
|
request.add_room_attendant(message.message.id);
|
|
state.add(message.message.id);
|
|
} else if (message.message.type === 'left') {
|
|
request.remove_room_attendant(message.message.id);
|
|
state.delete(message.message.id);
|
|
} else {
|
|
throw Error('Unexpected type: ' + message.type);
|
|
}
|
|
});
|
|
}
|
|
|
|
function get_more_blobs(connection) {
|
|
while (Object.keys(connection.active_blob_gets).length < k_blobs_concurrent_target) {
|
|
let next = Object.keys(connection.blob_get_queue).pop();
|
|
let expected_bytes = connection.blob_get_queue[next];
|
|
if (!next) {
|
|
break;
|
|
}
|
|
delete connection.blob_get_queue[next];
|
|
connection.active_blob_gets[next] = true;
|
|
|
|
let received_bytes = 0;
|
|
let buffer = new Uint8Array(expected_bytes);
|
|
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [next]}, function(message) {
|
|
if (message.flags & 0x4 /* end */) {
|
|
delete connection.active_blob_gets[next];
|
|
} else {
|
|
buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes);
|
|
received_bytes += message.message.byteLength;
|
|
if (received_bytes == expected_bytes) {
|
|
ssb.blobStore(buffer);
|
|
delete connection.active_blob_gets[next];
|
|
get_more_blobs(connection);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function send_blobs_create_wants(connection) {
|
|
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) {
|
|
if (message.message?.name === 'Error') {
|
|
return;
|
|
}
|
|
Object.keys(message.message).forEach(function(id) {
|
|
if (message.message[id] < 0) {
|
|
if (g_wants_requests[connection.id]) {
|
|
let blob = ssb.blobGet(id);
|
|
if (blob) {
|
|
let out_message = {};
|
|
out_message[id] = blob.byteLength;
|
|
g_wants_requests[connection.id].send_json(out_message);
|
|
}
|
|
}
|
|
} else {
|
|
let expected_bytes = message.message[id];
|
|
connection.blob_get_queue[id] = expected_bytes;
|
|
get_more_blobs(connection);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
ssb.addEventListener('connections', function on_connections_changed(change, connection) {
|
|
if (change == 'add') {
|
|
connection.active_blob_gets = {};
|
|
connection.blob_get_queue = {};
|
|
var sequence = get_latest_sequence_for_author(connection.id);
|
|
if (k_use_create_history_stream) {
|
|
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
|
|
var identities = ssb.getAllIdentities();
|
|
ssb.followingDeep(identities, 2).then(function(ids) {
|
|
for (let id of ids) {
|
|
if (identities.indexOf(id) != -1) {
|
|
continue;
|
|
}
|
|
var sequence = get_latest_sequence_for_author(id);
|
|
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
|
|
}
|
|
});
|
|
} else {
|
|
if (connection.is_client) {
|
|
connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
|
|
|
|
connection.send_json_async({'name': ['tunnel', 'isRoom'], 'args': []}, function tunnel_is_room(request) {
|
|
if (request.message) {
|
|
request.connection.send_json({'name': ['room', 'attendants'], 'args': [], 'type': 'source'}, tunnel_attendants);
|
|
}
|
|
});
|
|
}
|
|
send_blobs_create_wants(connection);
|
|
}
|
|
} else if (change == 'remove') {
|
|
print('REMOVE', connection.id);
|
|
notify_attendant_changed(connection.id, 'left');
|
|
delete g_attendants[connection.id];
|
|
delete g_wants_requests[connection.id];
|
|
} else {
|
|
print('CHANGE', change);
|
|
}
|
|
});
|
|
|
|
function blob_want_discovered(request, id) {
|
|
if (!request || !request.connection) {
|
|
return;
|
|
}
|
|
var message = {};
|
|
message[id] = -1;
|
|
request.send_json(message);
|
|
}
|
|
|
|
function sleep(ms) {
|
|
return new Promise(function(resolve, reject) {
|
|
setTimeout(x => resolve(), ms);
|
|
});
|
|
}
|
|
|
|
async function requestMoreBlobs(request) {
|
|
let last = '';
|
|
while (true) {
|
|
await ssb.sqlStream(
|
|
'SELECT id FROM blob_wants WHERE id > ? ORDER BY id LIMIT 32',
|
|
[last],
|
|
function(row) {
|
|
blob_want_discovered(request, row.id);
|
|
last = row.id;
|
|
});
|
|
await sleep(1000);
|
|
}
|
|
}
|
|
|
|
ssb.addRpc(['blobs', 'createWants'], function(request) {
|
|
g_wants_requests[request.connection.id] = request;
|
|
ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id));
|
|
requestMoreBlobs(request);
|
|
});
|
|
|
|
function notify_attendant_changed(id, type) {
|
|
if (!id) {
|
|
print(`notify_attendant_changed called with id=${id}`);
|
|
return;
|
|
}
|
|
for (let r of Object.values(g_attendants)) {
|
|
try {
|
|
r.send_json({
|
|
type: type,
|
|
id: id,
|
|
});
|
|
} catch (e) {
|
|
print(`Removing ${id} from g_attendants in ${type}.`, e);
|
|
delete g_attendants[id];
|
|
}
|
|
}
|
|
}
|
|
|
|
ssb.addRpc(['room', 'attendants'], function(request) {
|
|
let ids = Object.keys(g_attendants).sort();
|
|
request.send_json({
|
|
type: 'state',
|
|
ids: ids,
|
|
});
|
|
notify_attendant_changed(request.connection.id, 'joined');
|
|
g_attendants[request.connection.id] = request;
|
|
});
|
|
|
|
function ebtReplicateSendClock(request, have) {
|
|
var identities = ssb.getAllIdentities();
|
|
var message = {};
|
|
var last_sent = request.connection.sent_clock || {};
|
|
var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]);
|
|
if (!Object.keys(last_sent).length) {
|
|
for (let id of ids) {
|
|
message[id] = get_latest_sequence_for_author(id);
|
|
}
|
|
}
|
|
for (let id of Object.keys(have)) {
|
|
if (message[id] === undefined) {
|
|
var sequence = get_latest_sequence_for_author(id);
|
|
message[id] = sequence ? sequence : -1;
|
|
}
|
|
}
|
|
|
|
var to_send = {}
|
|
var offset = Math.floor(Math.random() * ids.length);
|
|
for (var i = 0; i < ids.length; i++) {
|
|
var id = ids[(i + offset) % ids.length];
|
|
if (last_sent[id] === undefined || message[id] > last_sent[id]) {
|
|
last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1;
|
|
}
|
|
if (Object.keys(to_send).length >= 32) {
|
|
request.send_json(to_send);
|
|
to_send = {};
|
|
}
|
|
}
|
|
request.connection.sent_clock = last_sent;
|
|
|
|
if (Object.keys(to_send).length) {
|
|
request.send_json(to_send);
|
|
}
|
|
}
|
|
|
|
function formatMessage(row) {
|
|
if (row.sequence_before_author) {
|
|
return {
|
|
previous: row.previous,
|
|
sequence: row.sequence,
|
|
author: row.author,
|
|
timestamp: row.timestamp,
|
|
hash: row.hash,
|
|
content: JSON.parse(row.content),
|
|
signature: row.signature,
|
|
};
|
|
} else {
|
|
return {
|
|
previous: row.previous,
|
|
author: row.author,
|
|
sequence: row.sequence,
|
|
timestamp: row.timestamp,
|
|
hash: row.hash,
|
|
content: JSON.parse(row.content),
|
|
signature: row.signature,
|
|
};
|
|
}
|
|
}
|
|
|
|
function ebtReplicateRegisterMessageCallback(request) {
|
|
ssb.addEventListener('message', function(message_id) {
|
|
if (request.connection.send_clock) {
|
|
ssb.sqlStream(
|
|
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1',
|
|
[message_id],
|
|
function (row) {
|
|
if (request.connection.send_clock[row.author] < row.sequence) {
|
|
request.send_json(formatMessage(row));
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
function ebtReplicateCommon(request) {
|
|
if (request.message.author) {
|
|
storeMessage(request);
|
|
} else {
|
|
ebtReplicateSendClock(request, request.message);
|
|
|
|
if (!request.connection.send_clock) {
|
|
request.connection.send_clock = {};
|
|
}
|
|
for (let id of Object.keys(request.message)) {
|
|
if (request.message[id] >= 0 && (request.message[id] & 1) == 0) {
|
|
request.connection.send_clock[id] = request.message[id] >> 1;
|
|
ssb.sqlStream(
|
|
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
|
|
[id, request.message[id] >> 1],
|
|
function (row) {
|
|
request.send_json(formatMessage(row));
|
|
request.connection.send_clock[id] = row.sequence;
|
|
});
|
|
} else {
|
|
delete request.connection.send_clock[id];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
function ebtReplicateClient(request) {
|
|
if (request.message?.name !== 'Error') {
|
|
if (!request.connection.message_registered) {
|
|
ebtReplicateRegisterMessageCallback(request);
|
|
request.connection.message_registered = true;
|
|
}
|
|
ebtReplicateCommon(request);
|
|
}
|
|
}
|
|
|
|
function ebtReplicateServer(request) {
|
|
ebtReplicateRegisterMessageCallback(request);
|
|
ebtReplicateSendClock(request, {});
|
|
request.more(ebtReplicateCommon);
|
|
}
|
|
|
|
ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer);
|
|
|
|
ssb.addRpc(['createHistoryStream'], function(request) {
|
|
var id = request.args[0].id;
|
|
var seq = request.args[0].seq;
|
|
var keys = request.args[0].keys || request.args[0].keys === undefined;
|
|
function sendMessage(row) {
|
|
if (keys) {
|
|
var message = {
|
|
key: row.id,
|
|
value: formatMessage(row),
|
|
timestamp: row.timestamp,
|
|
};
|
|
} else {
|
|
var message = formatMessage(row);
|
|
}
|
|
request.send_json(message);
|
|
}
|
|
ssb.sqlStream(
|
|
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
|
|
[id, seq ?? 0],
|
|
sendMessage);
|
|
ssb.addEventListener('message', function(message_id) {
|
|
ssb.sqlStream(
|
|
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2',
|
|
[message_id, id],
|
|
function (row) {
|
|
sendMessage(row);
|
|
});
|
|
});
|
|
});
|