Add a callback for when messages are added to the database. Abuse it to forward messages semi-live.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3686 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2021-11-11 00:05:07 +00:00
parent 18c90214a8
commit 63c344112d
4 changed files with 123 additions and 25 deletions

View File

@ -144,16 +144,13 @@ ssb.addRpc(['createHistoryStream'], function(request) {
var id = request.args[0].id;
var seq = request.args[0].seq;
var keys = request.args[0].keys || request.args[0].keys === undefined;
ssb.sqlStream(
'SELECT previous, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, seq ?? 0],
function(row) {
function sendMessage(row) {
if (keys) {
var message = {
key: row.id,
value: {
previous: row.previous,
author: id,
author: row.author,
sequence: row.sequence,
timestamp: row.timestamp,
hash: row.hash,
@ -165,7 +162,7 @@ ssb.addRpc(['createHistoryStream'], function(request) {
} else {
var message = {
previous: row.previous,
author: id,
author: row.author,
sequence: row.sequence,
timestamp: row.timestamp,
hash: row.hash,
@ -174,5 +171,15 @@ ssb.addRpc(['createHistoryStream'], function(request) {
};
}
request.send_json(message);
}
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, seq ?? 0],
sendMessage);
ssb.addEventListener('message', function(id) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1',
[id],
sendMessage);
});
});

View File

@ -94,6 +94,14 @@ typedef struct _tf_ssb_connections_changed_callback_node_t {
tf_ssb_connections_changed_callback_node_t* next;
} tf_ssb_connections_changed_callback_node_t;
typedef struct _tf_ssb_message_added_callback_node_t tf_ssb_message_added_callback_node_t;
typedef struct _tf_ssb_message_added_callback_node_t {
tf_ssb_message_added_callback_t* callback;
tf_ssb_callback_cleanup_t* cleanup;
void* user_data;
tf_ssb_message_added_callback_node_t* next;
} tf_ssb_message_added_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t {
tf_ssb_blob_want_added_callback_t* callback;
@ -142,6 +150,7 @@ typedef struct _tf_ssb_t {
tf_ssb_rpc_callback_node_t* rpc;
tf_ssb_connections_changed_callback_node_t* connections_changed;
tf_ssb_message_added_callback_node_t* message_added;
tf_ssb_blob_want_added_callback_node_t* blob_want_added;
tf_ssb_broadcasts_changed_callback_node_t* broadcasts_changed;
} tf_ssb_t;
@ -1145,6 +1154,8 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message)
printf("Failed to verify message signature.\n");
}
tf_ssb_notify_message_added(ssb, id);
JS_FreeValue(context, root);
}
@ -1610,6 +1621,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
}
free(node);
}
while (ssb->message_added)
{
tf_ssb_message_added_callback_node_t* node = ssb->message_added;
ssb->message_added = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
while (ssb->blob_want_added)
{
tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added;
@ -2270,6 +2291,50 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection)
return connection ? connection->object : JS_UNDEFINED;
}
void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
{
tf_ssb_message_added_callback_node_t* node = malloc(sizeof(tf_ssb_message_added_callback_node_t));
*node = (tf_ssb_message_added_callback_node_t)
{
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.next = ssb->message_added,
};
ssb->message_added = node;
}
void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void* user_data)
{
tf_ssb_message_added_callback_node_t** it = &ssb->message_added;
while (*it)
{
if ((*it)->callback == callback &&
(*it)->user_data == user_data)
{
tf_ssb_message_added_callback_node_t* node = *it;
*it = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
else
{
*it = (*it)->next;
}
}
}
void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id)
{
for (tf_ssb_message_added_callback_node_t* node = ssb->message_added; node; node = node->next)
{
node->callback(ssb, id, node->user_data);
}
}
void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
{
tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t));

View File

@ -96,6 +96,11 @@ typedef void (tf_ssb_broadcasts_changed_callback_t)(tf_ssb_t* ssb, void* user_da
void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data);
typedef void (tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data);
void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void* user_data);
void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id);
typedef void (tf_ssb_blob_want_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data);
void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data);

View File

@ -456,6 +456,17 @@ static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int ar
return JS_UNDEFINED;
}
static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JSValue string = JS_NewString(context, id);
JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &string);
tf_util_report_error(context, response);
JS_FreeValue(context, response);
JS_FreeValue(context, string);
}
static void _tf_ssb_on_blob_want_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
@ -606,6 +617,11 @@ static JSValue _tf_ssb_add_event_listener(JSContext* context, JSValueConst this_
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, _tf_ssb_cleanup_value, ptr);
}
else if (strcmp(event_name, "message") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_add_message_added_callback(ssb, _tf_ssb_on_message_added_callback, _tf_ssb_cleanup_value, ptr);
}
else if (strcmp(event_name, "blob_want_added") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
@ -644,6 +660,11 @@ static JSValue _tf_ssb_remove_event_listener(JSContext* context, JSValueConst th
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_remove_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, ptr);
}
else if (strcmp(event_name, "message_added") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_remove_message_added_callback(ssb, _tf_ssb_on_message_added_callback, ptr);
}
else if (strcmp(event_name, "blob_want_added") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));