From 69253432b800edc97cef13b144d8a0d578e688f9 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Sun, 8 Jan 2023 20:01:35 +0000 Subject: [PATCH] ssb.js is now entirely in C. Usual disclaimers about it not being amazingly well tested. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4111 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- core/core.js | 1 - core/ssb.js | 164 ------------------------------- src/ssb.c | 135 +++++++++++++------------ src/ssb.db.c | 28 ++++++ src/ssb.db.h | 1 + src/ssb.h | 9 ++ src/ssb.js.c | 213 +--------------------------------------- src/ssb.rpc.c | 267 ++++++++++++++++++++++++++++++++++++++++++++++---- src/task.c | 1 + 9 files changed, 359 insertions(+), 460 deletions(-) delete mode 100644 core/ssb.js diff --git a/core/core.js b/core/core.js index c1f7a04d..28d893d6 100644 --- a/core/core.js +++ b/core/core.js @@ -342,7 +342,6 @@ async function getProcessBlob(blobId, key, options) { }); } }; - delete imports.ssb.addRpc; if (process.credentials && process.credentials.session && diff --git a/core/ssb.js b/core/ssb.js deleted file mode 100644 index da49060a..00000000 --- a/core/ssb.js +++ /dev/null @@ -1,164 +0,0 @@ -"use strict"; -var g_database = new Database('core'); -const k_use_create_history_stream = false; - -function get_latest_sequence_for_author(author) { - var sequence = 0; - ssb.sqlStream( - 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', - [author], - function(row) { - if (row.sequence) { - sequence = row.sequence; - } - }); - return sequence; -} - -function storeMessage(message) { - var payload = message.message.value ? message.message.value : message.message; - if (typeof(payload) == 'object') { - ssb.storeMessage(payload); - } -} - -ssb.addEventListener('connections', function on_connections_changed(change, connection) { - if (change == 'add') { - var sequence = get_latest_sequence_for_author(connection.id); - if (k_use_create_history_stream) { - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); - var identities = ssb.getAllIdentities(); - let ids = ssb.followingDeep(identities, 2); - for (let id of ids) { - if (identities.indexOf(id) != -1) { - continue; - } - var sequence = get_latest_sequence_for_author(id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); - } - } else { - if (connection.is_client) { - connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); - } - } - } -}); - -function ebtReplicateSendClock(request, have) { - var identities = ssb.getAllIdentities(); - var message = {}; - var last_sent = request.connection.sent_clock || {}; - var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]); - if (!Object.keys(last_sent).length) { - for (let id of ids) { - message[id] = get_latest_sequence_for_author(id); - } - } - for (let id of Object.keys(have)) { - if (message[id] === undefined) { - var sequence = get_latest_sequence_for_author(id); - message[id] = sequence ? sequence : -1; - } - } - - var to_send = {} - var offset = Math.floor(Math.random() * ids.length); - for (var i = 0; i < ids.length; i++) { - var id = ids[(i + offset) % ids.length]; - if (last_sent[id] === undefined || message[id] > last_sent[id]) { - last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; - } - if (Object.keys(to_send).length >= 32) { - request.send_json(to_send); - to_send = {}; - } - } - request.connection.sent_clock = last_sent; - - if (Object.keys(to_send).length) { - request.send_json(to_send); - } -} - -function formatMessage(row) { - if (row.sequence_before_author) { - return { - previous: row.previous, - sequence: row.sequence, - author: row.author, - timestamp: row.timestamp, - hash: row.hash, - content: JSON.parse(row.content), - signature: row.signature, - }; - } else { - return { - previous: row.previous, - author: row.author, - sequence: row.sequence, - timestamp: row.timestamp, - hash: row.hash, - content: JSON.parse(row.content), - signature: row.signature, - }; - } -} - -function ebtReplicateRegisterMessageCallback(request) { - ssb.addEventListener('message', function(message_id) { - if (request.connection.send_clock) { - ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', - [message_id], - function (row) { - if (request.connection.send_clock[row.author] < row.sequence) { - request.send_json(formatMessage(row)); - } - }); - } - }); -} - -function ebtReplicateCommon(request) { - if (request.message.author) { - storeMessage(request); - } else { - ebtReplicateSendClock(request, request.message); - - if (!request.connection.send_clock) { - request.connection.send_clock = {}; - } - for (let id of Object.keys(request.message)) { - if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { - request.connection.send_clock[id] = request.message[id] >> 1; - ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', - [id, request.message[id] >> 1], - function (row) { - request.send_json(formatMessage(row)); - request.connection.send_clock[id] = row.sequence; - }); - } else { - delete request.connection.send_clock[id]; - } - } - } -} - -function ebtReplicateClient(request) { - if (request.message?.name !== 'Error') { - if (!request.connection.message_registered) { - ebtReplicateRegisterMessageCallback(request); - request.connection.message_registered = true; - } - ebtReplicateCommon(request); - } -} - -function ebtReplicateServer(request) { - ebtReplicateRegisterMessageCallback(request); - ebtReplicateSendClock(request, {}); - request.more(ebtReplicateCommon); -} - -ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); diff --git a/src/ssb.c b/src/ssb.c index 34ab3a08..843b14e9 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -206,6 +206,9 @@ typedef struct _tf_ssb_connection_t int32_t tunnel_request_number; tf_ssb_blob_wants_t blob_wants; + bool sent_clock; + int32_t ebt_request_number; + JSValue ebt_send_clock; JSValue object; @@ -521,6 +524,8 @@ void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0) { + connection->message_requests[index].request_number = request_number; + connection->message_requests[index].keys = keys; return; } connection->message_requests = tf_resize_vec(connection->message_requests, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count + 1)); @@ -537,6 +542,16 @@ void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, connection->message_requests_count++; } +void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author) +{ + int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); + if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0) + { + memmove(connection->message_requests + index, connection->message_requests + index + 1, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index)); + connection->message_requests_count--; + } +} + static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) { tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); @@ -1593,6 +1608,8 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea !connection->tcp.data && !connection->connect.data) { + JS_FreeValue(ssb->context, connection->ebt_send_clock); + connection->ebt_send_clock = JS_UNDEFINED; tf_free(connection->message_requests); connection->message_requests = NULL; connection->message_requests_count = 0; @@ -2079,64 +2096,6 @@ static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value) } } -static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) -{ - if (!user_data) - { - return; - } - - void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); - _tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data); -} - -static void _tf_ssb_connection_cleanup_value(tf_ssb_t* ssb, void* user_data) -{ - if (user_data) - { - JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); - JS_FreeValue(tf_ssb_get_context(ssb), callback); - } -} - -static JSValue _tf_ssb_connection_send_json_internal(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv, int flags) -{ - tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id); - if (!connection) - { - return JS_UNDEFINED; - } - - uint32_t request_number = tf_ssb_connection_next_request_number(connection); - - JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); - size_t size; - const char* message = JS_ToCStringLen(context, &size, message_val); - JS_FreeValue(context, message_val); - - tf_ssb_connection_rpc_send( - connection, - flags, - request_number, - (const uint8_t*)message, - size, - _tf_ssb_connection_send_json_response, - _tf_ssb_connection_cleanup_value, - JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL); - JS_FreeCString(context, message); - return JS_NewInt32(context, request_number); -} - -static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - return _tf_ssb_connection_send_json_internal(context, this_val, argc, argv, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream); -} - -static JSValue _tf_ssb_connection_send_json_async(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - return _tf_ssb_connection_send_json_internal(context, this_val, argc, argv, k_ssb_rpc_flag_json); -} - static void _tf_ssb_connection_process_message_async(uv_async_t* async) { tf_ssb_connection_t* connection = async->data; @@ -2181,8 +2140,6 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(connection->object, connection); - JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); - JS_SetPropertyStr(context, connection->object, "send_json_async", JS_NewCFunction(context, _tf_ssb_connection_send_json_async, "send_json_async", 2)); char public_key_str[k_id_base64_len] = { 0 }; if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key)) { @@ -2240,8 +2197,6 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(tunnel->object, tunnel); - JS_SetPropertyStr(context, tunnel->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); - JS_SetPropertyStr(context, tunnel->object, "send_json_async", JS_NewCFunction(context, _tf_ssb_connection_send_json_async, "send_json_async", 2)); JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id)); JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE); @@ -2340,8 +2295,6 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); - JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2)); - JS_SetPropertyStr(ssb->context, connection->object, "send_json_async", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json_async, "send_json_async", 2)); JS_SetOpaque(connection->object, connection); if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) @@ -3118,3 +3071,57 @@ tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t* { return connection ? &connection->blob_wants : NULL; } + +bool tf_ssb_verify_strip_and_store_message(tf_ssb_t* ssb, JSValue value) +{ + JSContext* context = tf_ssb_get_context(ssb); + char signature[crypto_sign_BYTES + 128]; + char id[crypto_hash_sha256_BYTES * 2 + 1]; + bool sequence_before_author = false; + if (tf_ssb_verify_and_strip_signature(context, value, id, sizeof(id), signature, sizeof(signature), &sequence_before_author)) + { + if (tf_ssb_db_store_message(ssb, context, id, value, signature, sequence_before_author)) + { + tf_ssb_notify_message_added(ssb, id); + return true; + } + } + else + { + printf("failed to verify message\n"); + } + return false; +} + +bool tf_ssb_connection_get_sent_clock(tf_ssb_connection_t* connection) +{ + return connection->sent_clock; +} + +void tf_ssb_connection_set_sent_clock(tf_ssb_connection_t* connection, bool sent_clock) +{ + connection->sent_clock = sent_clock; +} + +int32_t tf_ssb_connection_get_ebt_request_number(tf_ssb_connection_t* connection) +{ + return connection->ebt_request_number; +} + +void tf_ssb_connection_set_ebt_request_number(tf_ssb_connection_t* connection, int32_t request_number) +{ + connection->ebt_request_number = request_number; +} + +JSValue tf_ssb_connection_get_ebt_send_clock(tf_ssb_connection_t* connection) +{ + JSContext* context = connection->ssb->context; + return JS_DupValue(context, connection->ebt_send_clock); +} + +void tf_ssb_connection_set_ebt_send_clock(tf_ssb_connection_t* connection, JSValue send_clock) +{ + JSContext* context = connection->ssb->context; + JS_FreeValue(context, connection->ebt_send_clock); + connection->ebt_send_clock = JS_DupValue(context, send_clock); +} diff --git a/src/ssb.db.c b/src/ssb.db.c index 57993491..3212ee09 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -1102,6 +1102,34 @@ const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count return (const char**)result; } +typedef struct _identities_t +{ + const char** ids; + int count; +} identities_t; + +static void _add_identity(const char* identity, void* user_data) +{ + identities_t* identities = user_data; + char full_id[k_id_base64_len]; + snprintf(full_id, sizeof(full_id), "@%s", identity); + identities->ids = tf_resize_vec(identities->ids, sizeof(const char*) * (identities->count + 1)); + identities->ids[identities->count++] = tf_strdup(full_id); +} + +const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth) +{ + identities_t identities = { 0 }; + tf_ssb_db_identity_visit_all(ssb, _add_identity, &identities); + const char** following = tf_ssb_db_following_deep(ssb, identities.ids, identities.count, depth); + for (int i = 0; i < identities.count; i++) + { + tf_free((void*)identities.ids[i]); + } + tf_free(identities.ids); + return following; +} + static void _test_private(sqlite3* db, const uint8_t* private_key) { sqlite3_stmt* statement = NULL; diff --git a/src/ssb.db.h b/src/ssb.db.h index d66b824d..d32a3dc3 100644 --- a/src/ssb.db.h +++ b/src/ssb.db.h @@ -38,5 +38,6 @@ JSValue tf_ssb_format_message( const char* signature, bool sequence_before_author); const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth); +const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth); void tf_ssb_db_private(sqlite3* db); diff --git a/src/ssb.h b/src/ssb.h index 7b70007c..68754cf8 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -106,6 +106,7 @@ bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin); bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_id, size_t out_id_size, char* out_signature, size_t out_signature_size, bool* out_sequence_before_author); void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size); +bool tf_ssb_verify_strip_and_store_message(tf_ssb_t* ssb, JSValue value); bool tf_ssb_connection_is_client(tf_ssb_connection_t* connection); const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection); @@ -152,6 +153,7 @@ void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* co void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys); +void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author); bool tf_ssb_connection_is_attendant(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_get_attendant_request_number(tf_ssb_connection_t* connection); @@ -162,6 +164,13 @@ void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, co tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* portal_id, int32_t request_number, const char* target_id); +int32_t tf_ssb_connection_get_ebt_request_number(tf_ssb_connection_t* connection); +void tf_ssb_connection_set_ebt_request_number(tf_ssb_connection_t* connection, int32_t request_number); +JSValue tf_ssb_connection_get_ebt_send_clock(tf_ssb_connection_t* connection); +void tf_ssb_connection_set_ebt_send_clock(tf_ssb_connection_t* connection, JSValue send_clock); +bool tf_ssb_connection_get_sent_clock(tf_ssb_connection_t* connection); +void tf_ssb_connection_set_sent_clock(tf_ssb_connection_t* connection, bool sent_clock); + JSClassID tf_ssb_get_connection_class_id(); void tf_ssb_get_stats(tf_ssb_t* ssb, tf_ssb_stats_t* out_stats); diff --git a/src/ssb.js.c b/src/ssb.js.c index 8091f629..901d6acf 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -313,20 +313,7 @@ static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); - char signature[crypto_sign_BYTES + 128]; - char id[crypto_hash_sha256_BYTES * 2 + 1]; - bool sequence_before_author = false; - if (tf_ssb_verify_and_strip_signature(context, argv[0], id, sizeof(id), signature, sizeof(signature), &sequence_before_author)) - { - if (tf_ssb_db_store_message(ssb, context, id, argv[0], signature, sequence_before_author)) - { - tf_ssb_notify_message_added(ssb, id); - } - } - else - { - printf("failed to verify message\n"); - } + tf_ssb_verify_strip_and_store_message(ssb, argv[0]); return JS_UNDEFINED; } @@ -416,205 +403,12 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar return JS_UNDEFINED; } -static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); - tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); - JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); - int32_t request_number; - JS_ToInt32(context, &request_number, request_val); - JS_FreeValue(context, request_val); - - JSValue flags_val = JS_GetPropertyStr(context, this_val, "flags"); - int32_t flags_number; - JS_ToInt32(context, &flags_number, flags_val); - JS_FreeValue(context, flags_val); - - JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); - size_t size; - const char* message = JS_ToCStringLen(context, &size, message_val); - - tf_ssb_connection_rpc_send( - connection, - k_ssb_rpc_flag_json | (flags_number & ~k_ssb_rpc_mask_type), - -request_number, - (const uint8_t*)message, - size, - NULL, - NULL, - NULL); - JS_FreeValue(context, connection_val); - JS_FreeCString(context, message); - JS_FreeValue(context, message_val); - return JS_NewInt32(context, -request_number); -} - -static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); - tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); - JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); - int32_t request_number; - JS_ToInt32(context, &request_number, request_val); - JS_FreeValue(context, request_val); - - JSValue flags_val = JS_GetPropertyStr(context, this_val, "flags"); - int32_t flags_number; - JS_ToInt32(context, &flags_number, flags_val); - JS_FreeValue(context, flags_val); - - JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); - size_t size; - const char* message = JS_ToCStringLen(context, &size, message_val); - - tf_ssb_connection_rpc_send( - connection, - k_ssb_rpc_flag_json | (flags_number & ~k_ssb_rpc_mask_type) | k_ssb_rpc_flag_end_error, - -request_number, - (const uint8_t*)message, - size, - NULL, - NULL, - NULL); - JS_FreeValue(context, connection_val); - JS_FreeCString(context, message); - JS_FreeValue(context, message_val); - return JS_UNDEFINED; -} - static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) { JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); JS_FreeValue(tf_ssb_get_context(ssb), callback); } -static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); - tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); - JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); - int32_t request_number; - JS_ToInt32(context, &request_number, request_val); - JS_FreeValue(context, request_val); - - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])), NULL); - - JS_FreeValue(context, connection_val); - return JS_UNDEFINED; -} - -static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); - tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); - JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); - int32_t request_number; - JS_ToInt32(context, &request_number, request_val); - JS_FreeValue(context, request_val); - - size_t size; - uint8_t* message = tf_util_try_get_array_buffer(context, &size, argv[0]); - if (message) - { - tf_ssb_connection_rpc_send( - connection, - k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, - -request_number, - (const uint8_t*)message, - size, - NULL, - NULL, - NULL); - } - else - { - size_t offset; - size_t element_size; - JSValue buffer = tf_util_try_get_typed_array_buffer(context, argv[0], &offset, &size, &element_size); - if (!JS_IsException(buffer)) - { - size_t total_size; - message = tf_util_try_get_array_buffer(context, &total_size, buffer); - if (message) - { - tf_ssb_connection_rpc_send( - connection, - k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, - -request_number, - (const uint8_t*)message + offset, - size, - NULL, - NULL, - NULL); - } - } - JS_FreeValue(context, buffer); - } - JS_FreeValue(context, connection_val); - return JS_UNDEFINED; -} - -void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) -{ - tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); - JSContext* context = tf_ssb_get_context(ssb); - JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); - JSValue object = JS_NewObject(context); - JSValue connection_object = JS_DupValue(context, tf_ssb_connection_get_object(connection)); - JS_SetPropertyStr(context, object, "connection", connection_object); - JS_SetPropertyStr(context, object, "flags", JS_NewUint32(context, flags)); - JS_SetPropertyStr(context, object, "request_number", JS_NewInt32(context, request_number)); - JS_SetPropertyStr(context, object, "args", JS_GetPropertyStr(context, args, "args")); - JS_SetPropertyStr(context, object, "message", message && size ? JS_NewArrayBufferCopy(context, message, size) : JS_DupValue(context, args)); - JS_SetPropertyStr(context, object, "send_json", JS_NewCFunction(context, _tf_ssb_rpc_send_json, "send_json", 1)); - JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1)); - JS_SetPropertyStr(context, object, "send_json_end", JS_NewCFunction(context, _tf_ssb_rpc_send_json_end, "send_json_end", 1)); - JS_SetPropertyStr(context, object, "more", JS_NewCFunction(context, _tf_ssb_rpc_more, "more", 1)); - - JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object); - tf_util_report_error(context, result); - JS_FreeValue(context, result); - JS_FreeValue(context, object); -} - -static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); - if (!JS_IsArray(context, argv[0])) - { - return JS_ThrowTypeError(context, "Expected argument 1 to be an array of strings."); - } - if (!JS_IsFunction(context, argv[1])) - { - return JS_ThrowTypeError(context, "Expected argument 2 to be a function."); - } - - enum { k_max_name_parts = 16 }; - const char* name[k_max_name_parts + 1] = { 0 }; - - int length = tf_util_get_length(context, argv[0]); - if (length >= k_max_name_parts) - { - return JS_ThrowInternalError(context, "Too many parts to RPC name."); - } - - for (int i = 0; i < length; i++) - { - JSValue value = JS_GetPropertyUint32(context, argv[0], i); - name[i] = JS_ToCString(context, value); - JS_FreeValue(context, value); - } - - tf_ssb_add_rpc_callback(ssb, name, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1]))); - - for (int i = 0; i < length; i++) - { - JS_FreeCString(context, name[i]); - } - - return JS_UNDEFINED; -} - static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); @@ -1023,13 +817,8 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "followingDeep", JS_NewCFunction(context, _tf_ssb_followingDeep, "followingDeep", 2)); /* Should be trusted only. */ - JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2)); JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2)); JS_FreeValue(context, global); - - tf_util_register(context); - tf_database_register(context, tf_ssb_get_db(ssb)); - tf_ssb_run_file(context, "core/ssb.js"); } diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index e7d0979f..ec0ba6b7 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -678,26 +678,10 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c } } -static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); - JSValue arg_array = JS_GetPropertyStr(context, args, "args"); - JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); - if (JS_IsUndefined(arg)) - { - tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream."); - } - JSValue id = JS_GetPropertyStr(context, arg, "id"); - JSValue seq = JS_GetPropertyStr(context, arg, "seq"); - JSValue keys = JS_GetPropertyStr(context, arg, "keys"); - JSValue live = JS_GetPropertyStr(context, arg, "live"); - bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; - bool is_live = JS_ToBool(context, live) > 0; - int64_t sequence = 0; - JS_ToInt64(context, &sequence, seq); - const char* author = JS_ToCString(context, id); - sqlite3* db = tf_ssb_get_db(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) @@ -719,7 +703,7 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin (const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8)); - if (is_keys) + if (keys) { message = JS_NewObject(context); JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); @@ -730,12 +714,35 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin { message = formatted; } - tf_ssb_connection_rpc_send_json(connection, flags, -request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); } } sqlite3_finalize(statement); } +} + +static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue arg_array = JS_GetPropertyStr(context, args, "args"); + JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); + if (JS_IsUndefined(arg)) + { + tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream."); + } + JSValue id = JS_GetPropertyStr(context, arg, "id"); + JSValue seq = JS_GetPropertyStr(context, arg, "seq"); + JSValue keys = JS_GetPropertyStr(context, arg, "keys"); + JSValue live = JS_GetPropertyStr(context, arg, "live"); + bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; + bool is_live = JS_ToBool(context, live) > 0; + int64_t sequence = 0; + JS_ToInt64(context, &sequence, seq); + const char* author = JS_ToCString(context, id); + + _tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys); if (is_live) { @@ -750,6 +757,225 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin JS_FreeValue(context, arg_array); } +static bool _is_error(JSContext* context, JSValue message) +{ + JSValue name = JS_GetPropertyStr(context, message, "name"); + const char* name_string = JS_ToCString(context, name); + bool is_error = false; + if (name_string && strcmp(name_string, "Error") == 0) + { + is_error = true; + } + JS_FreeCString(context, name_string); + JS_FreeValue(context, name); + return is_error; +} + +static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue full_clock = JS_NewObject(context); + + /* Ask for every identity we know is being followed from local accounts. */ + const char** visible = tf_ssb_db_get_all_visible_identities(ssb, 2); + for (int i = 0; visible[i]; i++) + { + int64_t sequence = 0; + tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); + JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence)); + } + + /* Ask about the incoming connection, too. */ + char id[k_id_base64_len] = ""; + if (tf_ssb_connection_get_id(connection, id, sizeof(id))) + { + JSValue in_clock = JS_GetPropertyStr(context, full_clock, id); + if (JS_IsUndefined(in_clock)) + { + int64_t sequence = 0; + tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); + JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence)); + } + JS_FreeValue(context, in_clock); + } + + /* Also respond with what we know about all requested identities. */ + if (!JS_IsUndefined(message)) + { + JSPropertyEnum* ptab; + uint32_t plen; + JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); + for (uint32_t i = 0; i < plen; ++i) + { + JSValue in_clock = JS_GetProperty(context, full_clock, ptab[i].atom); + if (JS_IsUndefined(in_clock)) + { + JSValue key = JS_AtomToString(context, ptab[i].atom); + const char* key_string = JS_ToCString(context, key); + if (key_string) + { + int64_t sequence = -1; + tf_ssb_db_get_latest_message_by_author(ssb, key_string, &sequence, NULL, 0); + JS_SetPropertyStr(context, full_clock, key_string, JS_NewInt64(context, sequence)); + } + JS_FreeCString(context, key_string); + JS_FreeValue(context, key); + } + } + for (uint32_t i = 0; i < plen; ++i) + { + JS_FreeAtom(context, ptab[i].atom); + } + js_free(context, ptab); + } + + tf_free(visible); + + /* TODO: Send it in bite-size chunks. */ + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -request_number, full_clock, NULL, NULL, NULL); + JS_FreeValue(context, full_clock); +} + +static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message) +{ + if (JS_IsUndefined(message)) + { + return; + } + + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSPropertyEnum* ptab = NULL; + uint32_t plen = 0; + JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); + for (uint32_t i = 0; i < plen; ++i) + { + JSValue in_clock = JS_GetProperty(context, message, ptab[i].atom); + if (!JS_IsUndefined(in_clock)) + { + JSValue key = JS_AtomToString(context, ptab[i].atom); + int64_t sequence = -1; + JS_ToInt64(context, &sequence, in_clock); + const char* author = JS_ToCString(context, key); + if (sequence >= 0 && (sequence & 1) == 0) + { + int32_t request_number = -tf_ssb_connection_get_ebt_request_number(connection); + _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence, false); + tf_ssb_connection_add_new_message_request(connection, author, request_number, false); + } + else + { + tf_ssb_connection_remove_new_message_request(connection, author); + } + JS_FreeCString(context, author); + JS_FreeValue(context, key); + } + JS_FreeValue(context, in_clock); + } + for (uint32_t i = 0; i < plen; ++i) + { + JS_FreeAtom(context, ptab[i].atom); + } + js_free(context, ptab); +} + +static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + if (_is_error(context, args)) + { + /* TODO: Send createHistoryStream. */ + return; + } + + JSValue author = JS_GetPropertyStr(context, args, "author"); + JSValue name = JS_GetPropertyStr(context, args, "name"); + JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED; + + if (!JS_IsUndefined(author)) + { + /* Looks like a message. */ + tf_ssb_verify_strip_and_store_message(ssb, args); + } + else + { + /* EBT clock. */ + tf_ssb_connection_set_ebt_send_clock(connection, args); + if (!tf_ssb_connection_get_sent_clock(connection)) + { + _tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock); + tf_ssb_connection_set_sent_clock(connection, true); + } + _tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock); + + } + JS_FreeValue(context, name); + JS_FreeValue(context, author); +} + +static void _tf_ssb_rpc_ebt_replicate_client(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); +} + +static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue message = JS_NewObject(context); + JSValue name = JS_NewArray(context); + JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt")); + JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate")); + JS_SetPropertyStr(context, message, "name", name); + JSValue arg = JS_NewObject(context); + JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3)); + JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic")); + JSValue args = JS_NewArray(context); + JS_SetPropertyUint32(context, args, 0, arg); + JS_SetPropertyStr(context, message, "args", args); + JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); + tf_ssb_connection_rpc_send_json( + connection, + 0, + tf_ssb_connection_next_request_number(connection), + message, + _tf_ssb_rpc_ebt_replicate_client, + NULL, + NULL); + JS_FreeValue(context, message); +} + +static void _tf_ssb_rpc_ebt_replicate_server(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + if (_is_error(context, args)) + { + /* TODO: Send createHistoryStream. */ + return; + } + + if (!tf_ssb_connection_get_ebt_request_number(connection)) + { + tf_ssb_connection_set_ebt_request_number(connection, request_number); + } + + JSValue in_name = JS_GetPropertyStr(context, args, "name"); + if (!JS_IsUndefined(in_name)) + { + /* This is the server receiving the initial ebt.replicate message. Respond. */ + if (!tf_ssb_connection_is_client(connection)) + { + _tf_ssb_rpc_send_ebt_replicate(connection); + } + } + JS_FreeValue(context, in_name); + + _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); +} + static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); @@ -789,6 +1015,8 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang NULL, NULL); JS_FreeValue(context, message); + + _tf_ssb_rpc_send_ebt_replicate(connection); } } else if (change == k_tf_ssb_change_remove) @@ -833,4 +1061,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb) tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_tunnel_is_room, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); } diff --git a/src/task.c b/src/task.c index db564737..a5164c0f 100644 --- a/src/task.c +++ b/src/task.c @@ -1555,6 +1555,7 @@ void tf_task_activate(tf_task_t* task) JS_SetPropertyStr(context, global, "Socket", tf_socket_register(context)); JS_SetPropertyStr(context, global, "TlsContext", tf_tls_context_register(context)); tf_file_register(context); + tf_database_register(context, task->_db); task->_ssb = tf_ssb_create(&task->_loop, task->_context, task->_db); tf_ssb_set_trace(task->_ssb, task->_trace);