From 63c344112dc5de7445fc1290153324ae11bf583e Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Thu, 11 Nov 2021 00:05:07 +0000 Subject: [PATCH] Add a callback for when messages are added to the database. Abuse it to forward messages semi-live. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3686 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- core/ssb.js | 57 +++++++++++++++++++++++++-------------------- src/ssb.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ssb.h | 5 ++++ src/ssb.js.c | 21 +++++++++++++++++ 4 files changed, 123 insertions(+), 25 deletions(-) diff --git a/core/ssb.js b/core/ssb.js index d7aa8d45..4b31cfa0 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -144,35 +144,42 @@ ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; - ssb.sqlStream( - 'SELECT previous, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', - [id, seq ?? 0], - function(row) { - if (keys) { - var message = { - key: row.id, - value: { - previous: row.previous, - author: id, - sequence: row.sequence, - timestamp: row.timestamp, - hash: row.hash, - content: JSON.parse(row.content), - signature: row.signature, - }, - timestamp: row.timestamp, - }; - } else { - var message = { + function sendMessage(row) { + if (keys) { + var message = { + key: row.id, + value: { previous: row.previous, - author: id, + author: row.author, sequence: row.sequence, timestamp: row.timestamp, hash: row.hash, content: JSON.parse(row.content), signature: row.signature, - }; - } - request.send_json(message); - }); + }, + timestamp: row.timestamp, + }; + } else { + var message = { + previous: row.previous, + author: row.author, + sequence: row.sequence, + timestamp: row.timestamp, + hash: row.hash, + content: JSON.parse(row.content), + signature: row.signature, + }; + } + request.send_json(message); + } + ssb.sqlStream( + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', + [id, seq ?? 0], + sendMessage); + ssb.addEventListener('message', function(id) { + ssb.sqlStream( + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', + [id], + sendMessage); + }); }); diff --git a/src/ssb.c b/src/ssb.c index 075507e6..a3739065 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -94,6 +94,14 @@ typedef struct _tf_ssb_connections_changed_callback_node_t { tf_ssb_connections_changed_callback_node_t* next; } tf_ssb_connections_changed_callback_node_t; +typedef struct _tf_ssb_message_added_callback_node_t tf_ssb_message_added_callback_node_t; +typedef struct _tf_ssb_message_added_callback_node_t { + tf_ssb_message_added_callback_t* callback; + tf_ssb_callback_cleanup_t* cleanup; + void* user_data; + tf_ssb_message_added_callback_node_t* next; +} tf_ssb_message_added_callback_node_t; + typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t; typedef struct _tf_ssb_blob_want_added_callback_node_t { tf_ssb_blob_want_added_callback_t* callback; @@ -142,6 +150,7 @@ typedef struct _tf_ssb_t { tf_ssb_rpc_callback_node_t* rpc; tf_ssb_connections_changed_callback_node_t* connections_changed; + tf_ssb_message_added_callback_node_t* message_added; tf_ssb_blob_want_added_callback_node_t* blob_want_added; tf_ssb_broadcasts_changed_callback_node_t* broadcasts_changed; } tf_ssb_t; @@ -1145,6 +1154,8 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message) printf("Failed to verify message signature.\n"); } + tf_ssb_notify_message_added(ssb, id); + JS_FreeValue(context, root); } @@ -1610,6 +1621,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb) } free(node); } + while (ssb->message_added) + { + tf_ssb_message_added_callback_node_t* node = ssb->message_added; + ssb->message_added = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } while (ssb->blob_want_added) { tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; @@ -2270,6 +2291,50 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) return connection ? connection->object : JS_UNDEFINED; } +void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) +{ + tf_ssb_message_added_callback_node_t* node = malloc(sizeof(tf_ssb_message_added_callback_node_t)); + *node = (tf_ssb_message_added_callback_node_t) + { + .callback = callback, + .cleanup = cleanup, + .user_data = user_data, + .next = ssb->message_added, + }; + ssb->message_added = node; +} + +void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void* user_data) +{ + tf_ssb_message_added_callback_node_t** it = &ssb->message_added; + while (*it) + { + if ((*it)->callback == callback && + (*it)->user_data == user_data) + { + tf_ssb_message_added_callback_node_t* node = *it; + *it = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } + else + { + *it = (*it)->next; + } + } +} + +void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id) +{ + for (tf_ssb_message_added_callback_node_t* node = ssb->message_added; node; node = node->next) + { + node->callback(ssb, id, node->user_data); + } +} + void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) { tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t)); diff --git a/src/ssb.h b/src/ssb.h index 6801f988..45f14c57 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -96,6 +96,11 @@ typedef void (tf_ssb_broadcasts_changed_callback_t)(tf_ssb_t* ssb, void* user_da void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data); +typedef void (tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data); +void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void* user_data); +void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id); + typedef void (tf_ssb_blob_want_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data); void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data); diff --git a/src/ssb.js.c b/src/ssb.js.c index 1ed8a9cb..cc4d5529 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -456,6 +456,17 @@ static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int ar return JS_UNDEFINED; } +static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) +{ + JSContext* context = tf_ssb_get_context(ssb); + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JSValue string = JS_NewString(context, id); + JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &string); + tf_util_report_error(context, response); + JS_FreeValue(context, response); + JS_FreeValue(context, string); +} + static void _tf_ssb_on_blob_want_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); @@ -606,6 +617,11 @@ static JSValue _tf_ssb_add_event_listener(JSContext* context, JSValueConst this_ void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, _tf_ssb_cleanup_value, ptr); } + else if (strcmp(event_name, "message") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_add_message_added_callback(ssb, _tf_ssb_on_message_added_callback, _tf_ssb_cleanup_value, ptr); + } else if (strcmp(event_name, "blob_want_added") == 0) { void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); @@ -644,6 +660,11 @@ static JSValue _tf_ssb_remove_event_listener(JSContext* context, JSValueConst th void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); tf_ssb_remove_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, ptr); } + else if (strcmp(event_name, "message_added") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_remove_message_added_callback(ssb, _tf_ssb_on_message_added_callback, ptr); + } else if (strcmp(event_name, "blob_want_added") == 0) { void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));