diff --git a/core/ssb.js b/core/ssb.js index fb2be7e2..516f4488 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -1,15 +1,16 @@ var g_wants_requests = {}; var g_database = new Database('core'); +const k_use_create_history_stream = false; -async function following(db, id) { - var o = await db.get(id + ":following"); +function following(db, id) { + var o = db.get(id + ":following"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); - await ssb.sqlStream( + ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ @@ -33,18 +34,18 @@ async function following(db, id) { f.users = Array.from(f.users); var j = JSON.stringify(f); if (o != j) { - await db.set(id + ":following", j); + db.set(id + ":following", j); } return f.users; } -async function followingDeep(db, seed_ids, depth) { +function followingDeep(db, seed_ids, depth) { if (depth <= 0) { return seed_ids; } - var f = await Promise.all(seed_ids.map(x => following(db, x))); + var f = seed_ids.map(x => following(db, x)); var ids = [].concat(...f); - var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1); + var x = followingDeep(db, [...new Set(ids)].sort(), depth - 1); x = [...new Set([].concat(...x, ...seed_ids))].sort(); return x; } @@ -56,7 +57,7 @@ function get_latest_sequence_for_author(author) { [author], function(row) { if (row.sequence) { - sequence = row.sequence + 1; + sequence = row.sequence; } }); return sequence; @@ -72,7 +73,23 @@ function storeMessage(message) { ssb.addEventListener('connections', function(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); + if (k_use_create_history_stream) { + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); + var me = ssb.whoami(); + followingDeep(g_database, [me], 2).then(function(ids) { + for (let id of ids) { + if (id == me) { + continue; + } + var sequence = get_latest_sequence_for_author(id); + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); + } + }); + } else { + if (connection.is_client) { + connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); + } + } connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { @@ -83,7 +100,6 @@ ssb.addEventListener('connections', function(change, connection) { g_wants_requests[connection.id].send_json(out_message); } } else { - print("blobs.get", id); var received_bytes = 0; var expected_bytes = message.message[id]; var buffer = new Uint8Array(expected_bytes); @@ -97,16 +113,6 @@ ssb.addEventListener('connections', function(change, connection) { } }); }); - var me = ssb.whoami(); - followingDeep(g_database, [me], 2).then(function(ids) { - for (let id of ids) { - if (id == me) { - continue; - } - var sequence = get_latest_sequence_for_author(id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); - } - }); } else if (change == 'remove') { print('REMOVE', connection.id); delete g_wants_requests[connection.id]; @@ -149,10 +155,97 @@ ssb.addRpc(['blobs', 'get'], function(request) { ssb.addRpc(['gossip', 'ping'], function(request) { request.more(function(message) { - message.send_json(message.message); + message.send_json(Date.now()); }); }); +ssb.addRpc(['tunnel', 'isRoom'], function(request) { + request.send_json(false); +}); + +function ebtReplicateSendClock(request, have) { + var me = ssb.whoami(); + var message = {}; + var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]).concat(Object.keys(have)); + for (let id of ids) { + message[id] = get_latest_sequence_for_author(id); + } + + var last_sent = request.connection.sent_clock || {}; + var to_send = {} + for (let id of ids) { + if (last_sent[id] === undefined || message[id] > last_sent[id]) { + last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; + } + } + request.connection.sent_clock = last_sent; + + if (Object.keys(to_send).length) { + request.send_json(to_send); + } +} + +function formatMessage(row) { + var message = { + previous: row.previous, + author: row.author, + sequence: row.sequence, + timestamp: row.timestamp, + hash: row.hash, + content: JSON.parse(row.content), + signature: row.signature, + }; + return message; +} + +function ebtReplicateRegisterMessageCallback(request) { + var me = ssb.whoami(); + ssb.addEventListener('message', function(message_id) { + ssb.sqlStream( + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2', + [message_id, me], + function (row) { + request.send_json(formatMessage(row)); + }); + }); +} + +function ebtReplicateCommon(request) { + var me = ssb.whoami(); + if (request.message.author) { + storeMessage(request); + } else { + ebtReplicateSendClock(request, request.message); + + for (let id of Object.keys(request.message)) { + if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { + ssb.sqlStream( + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', + [id, request.message[id] >> 1], + function (row) { + request.send_json(formatMessage(row)); + }); + } + } + } +} + +function ebtReplicateClient(request) { + if (!request.connection.message_registered) { + ebtReplicateRegisterMessageCallback(request); + request.connection.message_registered = true; + } + ebtReplicateCommon(request); +} + +function ebtReplicateServer(request) { + ebtReplicateRegisterMessageCallback(request); + ebtReplicateSendClock(request, {}); + request.more(ebtReplicateCommon); +} + +ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); + ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; @@ -191,12 +284,10 @@ ssb.addRpc(['createHistoryStream'], function(request) { sendMessage); ssb.addEventListener('message', function(message_id) { ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', - [message_id], + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2', + [message_id, id], function (row) { - if (row.author == id) { - sendMessage(row); - } + sendMessage(row); }); }); }); diff --git a/src/ssb.c b/src/ssb.c index 8453f7d5..0666ad38 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -408,6 +408,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ { if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { + /* TODO: This leaks the callback. */ return; } tf_ssb_request_t* request = malloc(sizeof(tf_ssb_request_t)); @@ -661,6 +662,7 @@ static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, JSContext* context = connection->ssb->context; JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); + JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); connection->state = k_tf_ssb_state_verified; _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); @@ -866,6 +868,7 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne JSContext* context = connection->ssb->context; JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); + JS_SetPropertyStr(context, connection->object, "is_client", JS_FALSE); connection->state = k_tf_ssb_state_server_verified; _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); @@ -967,7 +970,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } else { - const char* k_unsupported = "{\"message\": \"unsupported message\", \"name\": \"Error\", \"stack\": \"none\"}"; + const char* k_unsupported = "{\"message\": \"method: is not in list of allowed methods\", \"name\": \"Error\", \"stack\": \"none\"}"; tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | (flags & k_ssb_rpc_flag_stream), -request_number, (const uint8_t*)k_unsupported, strlen(k_unsupported), NULL, NULL); } @@ -1322,7 +1325,6 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) connect->data = NULL; if (status == 0) { - printf("on connect\n"); connection->state = k_tf_ssb_state_connected; uv_read_start(connect->handle, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); _tf_ssb_connection_client_send_hello(connect->handle); @@ -1812,6 +1814,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key)) { JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, public_key_str)); + JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); } JS_SetOpaque(connection->object, connection); diff --git a/src/ssb.js.c b/src/ssb.js.c index 1099b467..4788d53c 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -352,13 +352,18 @@ static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, JS_ToInt32(context, &request_number, request_val); JS_FreeValue(context, request_val); + JSValue flags_val = JS_GetPropertyStr(context, this_val, "flags"); + int32_t flags_number; + JS_ToInt32(context, &flags_number, flags_val); + JS_FreeValue(context, flags_val); + JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); size_t size; const char* message = JS_ToCStringLen(context, &size, message_val); tf_ssb_connection_rpc_send( connection, - k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_json | (flags_number & ~k_ssb_rpc_mask_type), -request_number, (const uint8_t*)message, size,