diff --git a/core/ssb.js b/core/ssb.js index a835784b..da49060a 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -1,5 +1,4 @@ "use strict"; -var g_wants_requests = {}; var g_database = new Database('core'); const k_use_create_history_stream = false; @@ -29,25 +28,19 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); var identities = ssb.getAllIdentities(); - ssb.followingDeep(identities, 2).then(function(ids) { - for (let id of ids) { - if (identities.indexOf(id) != -1) { - continue; - } - var sequence = get_latest_sequence_for_author(id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); + let ids = ssb.followingDeep(identities, 2); + for (let id of ids) { + if (identities.indexOf(id) != -1) { + continue; } - }); + var sequence = get_latest_sequence_for_author(id); + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); + } } else { if (connection.is_client) { connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); } } - } else if (change == 'remove') { - print('REMOVE', connection.id); - delete g_wants_requests[connection.id]; - } else { - print('CHANGE', change); } }); @@ -169,36 +162,3 @@ function ebtReplicateServer(request) { } ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer); - -ssb.addRpc(['createHistoryStream'], function(request) { - if (!request?.args) { - print('missing request.args in createHistoryStream'); - } - var id = request.args[0].id; - var seq = request.args[0].seq; - var keys = request.args[0].keys || request.args[0].keys === undefined; - function sendMessage(row) { - if (keys) { - var message = { - key: row.id, - value: formatMessage(row), - timestamp: row.timestamp, - }; - } else { - var message = formatMessage(row); - } - request.send_json(message); - } - ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', - [id, seq ?? 0], - sendMessage); - ssb.addEventListener('message', function(message_id) { - ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2', - [message_id, id], - function (row) { - sendMessage(row); - }); - }); -}); diff --git a/src/ssb.c b/src/ssb.c index 35d7180b..34ab3a08 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -188,6 +188,13 @@ typedef struct _tf_ssb_t int broadcasts_changed_count; } tf_ssb_t; +typedef struct _tf_ssb_connection_message_request_t +{ + char author[k_id_base64_len]; + int32_t request_number; + bool keys; +} tf_ssb_connection_message_request_t; + typedef struct _tf_ssb_connection_t { tf_ssb_t* ssb; @@ -241,6 +248,9 @@ typedef struct _tf_ssb_connection_t tf_ssb_request_t* requests; int requests_count; const char* destroy_reason; + + tf_ssb_connection_message_request_t* message_requests; + int message_requests_count; } tf_ssb_connection_t; static JSClassID _connection_class_id; @@ -499,6 +509,34 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ connection->ssb->request_count++; } +static int _message_request_compare(const void* a, const void* b) +{ + const char* author = a; + const tf_ssb_connection_message_request_t* rb = b; + return strcmp(author, rb->author); +} + +void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys) +{ + int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); + if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0) + { + return; + } + connection->message_requests = tf_resize_vec(connection->message_requests, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count + 1)); + if (connection->message_requests_count - index) + { + memmove(connection->message_requests + index + 1, connection->message_requests + index, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index)); + } + connection->message_requests[index] = (tf_ssb_connection_message_request_t) + { + .request_number = request_number, + .keys = keys, + }; + snprintf(connection->message_requests[index].author, sizeof(connection->message_requests[index].author), "%s", author); + connection->message_requests_count++; +} + static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) { tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); @@ -518,7 +556,6 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { - printf("SEND %p\n", connection); if (!connection) { return; @@ -1475,12 +1512,10 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message) void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) { - printf("DESTROY %p\n", connection); tf_ssb_t* ssb = connection->ssb; if (!connection->destroy_reason) { connection->destroy_reason = reason; - printf("destroying connection %p obj=%p: %s\n", connection, JS_VALUE_GET_PTR(connection->object), reason); } while (connection->requests) { @@ -1558,6 +1593,10 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea !connection->tcp.data && !connection->connect.data) { + tf_free(connection->message_requests); + connection->message_requests = NULL; + connection->message_requests_count = 0; + tf_free(connection); } } @@ -2907,6 +2946,43 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id) next = node->next; node->callback(ssb, id, node->user_data); } + + JSContext* context = ssb->context; + JSValue message_keys = tf_ssb_db_get_message_by_id(ssb, id, true); + JSValue message = JS_GetPropertyStr(context, message_keys, "value"); + if (!JS_IsUndefined(message)) + { + JSValue author = JS_GetPropertyStr(context, message, "author"); + const char* author_string = JS_ToCString(context, author); + + for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) + { + tf_ssb_connection_message_request_t* message_request = + bsearch( + author_string, + connection->message_requests, + connection->message_requests_count, + sizeof(tf_ssb_connection_message_request_t), + _message_request_compare); + if (message_request) + { + tf_ssb_connection_rpc_send_json( + connection, + k_ssb_rpc_flag_stream, + message_request->request_number, + message_request->keys ? message_keys : message, + NULL, + NULL, + NULL); + + } + } + + JS_FreeCString(context, author_string); + JS_FreeValue(context, author); + } + JS_FreeValue(context, message); + JS_FreeValue(context, message_keys); } void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) diff --git a/src/ssb.db.c b/src/ssb.db.c index 0221c44f..57993491 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -711,7 +711,7 @@ JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue bi return result; } -static JSValue _tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author) +JSValue tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author) { JSValue value = JS_NewObject(context); JS_SetPropertyStr(context, value, "previous", previous ? JS_NewString(context, previous) : JS_NULL); @@ -779,7 +779,7 @@ bool tf_ssb_db_check(sqlite3* db, const char* check_author) const char* content = (const char*)sqlite3_column_text(statement, 6); const char* signature = (const char*)sqlite3_column_text(statement, 7); bool sequence_before_author = sqlite3_column_int(statement, 8); - JSValue message = _tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author); + JSValue message = tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author); char out_signature[512]; char actual_id[k_id_base64_len]; bool actual_sequence_before_author = false; @@ -1182,3 +1182,45 @@ void tf_ssb_db_private(sqlite3* db) sqlite3_finalize(statement); } } + +JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys) +{ + JSValue result = JS_UNDEFINED; + JSContext* context = tf_ssb_get_context(ssb); + sqlite3* db = tf_ssb_get_db(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) == SQLITE_ROW) + { + JSValue message = JS_UNDEFINED; + JSValue formatted = tf_ssb_format_message( + context, + (const char*)sqlite3_column_text(statement, 0), + (const char*)sqlite3_column_text(statement, 1), + sqlite3_column_int64(statement, 3), + sqlite3_column_double(statement, 4), + (const char*)sqlite3_column_text(statement, 5), + (const char*)sqlite3_column_text(statement, 6), + (const char*)sqlite3_column_text(statement, 7), + sqlite3_column_int(statement, 8)); + if (is_keys) + { + message = JS_NewObject(context); + JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); + JS_SetPropertyStr(context, message, "value", formatted); + JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4))); + } + else + { + message = formatted; + } + result = message; + } + } + sqlite3_finalize(statement); + } + return result; +} diff --git a/src/ssb.db.h b/src/ssb.db.h index 9493b39b..d66b824d 100644 --- a/src/ssb.db.h +++ b/src/ssb.db.h @@ -12,6 +12,7 @@ bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id); bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size); bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new); +JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys); bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* author, int64_t sequence, char* out_message_id, size_t out_message_id_size, double* out_timestamp, char** out_content); bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, int64_t* out_sequence, char* out_message_id, size_t out_message_id_size); JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data); @@ -26,6 +27,16 @@ void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)( void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data); bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size); +JSValue tf_ssb_format_message( + JSContext* context, + const char* previous, + const char* author, + int64_t sequence, + double timestamp, + const char* hash, + const char* content, + const char* signature, + bool sequence_before_author); const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth); void tf_ssb_db_private(sqlite3* db); diff --git a/src/ssb.h b/src/ssb.h index fdf660aa..7b70007c 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -151,6 +151,8 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); +void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys); + bool tf_ssb_connection_is_attendant(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_get_attendant_request_number(tf_ssb_connection_t* connection); void tf_ssb_connection_set_attendant(tf_ssb_connection_t* connection, bool attendant, int request_number); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index c94636d5..e7d0979f 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -678,6 +678,78 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c } } +static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue arg_array = JS_GetPropertyStr(context, args, "args"); + JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); + if (JS_IsUndefined(arg)) + { + tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream."); + } + JSValue id = JS_GetPropertyStr(context, arg, "id"); + JSValue seq = JS_GetPropertyStr(context, arg, "seq"); + JSValue keys = JS_GetPropertyStr(context, arg, "keys"); + JSValue live = JS_GetPropertyStr(context, arg, "live"); + bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; + bool is_live = JS_ToBool(context, live) > 0; + int64_t sequence = 0; + JS_ToInt64(context, &sequence, seq); + const char* author = JS_ToCString(context, id); + + sqlite3* db = tf_ssb_get_db(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && + sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK) + { + while (sqlite3_step(statement) == SQLITE_ROW) + { + JSValue message = JS_UNDEFINED; + + JSValue formatted = tf_ssb_format_message( + context, + (const char*)sqlite3_column_text(statement, 0), + (const char*)sqlite3_column_text(statement, 1), + sqlite3_column_int64(statement, 3), + sqlite3_column_double(statement, 4), + (const char*)sqlite3_column_text(statement, 5), + (const char*)sqlite3_column_text(statement, 6), + (const char*)sqlite3_column_text(statement, 7), + sqlite3_column_int(statement, 8)); + if (is_keys) + { + message = JS_NewObject(context); + JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); + JS_SetPropertyStr(context, message, "value", formatted); + JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4))); + } + else + { + message = formatted; + } + tf_ssb_connection_rpc_send_json(connection, flags, -request_number, message, NULL, NULL, NULL); + JS_FreeValue(context, message); + } + } + sqlite3_finalize(statement); + } + + if (is_live) + { + tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys); + } + + JS_FreeCString(context, author); + JS_FreeValue(context, id); + JS_FreeValue(context, seq); + JS_FreeValue(context, keys); + JS_FreeValue(context, arg); + JS_FreeValue(context, arg_array); +} + static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); @@ -760,4 +832,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb) tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_tunnel_is_room, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); }