From 18c90214a80a9651782afb619c01e93aec6f657f Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Sun, 7 Nov 2021 22:28:58 +0000 Subject: [PATCH] Trying to normalize event handling somewhat. More to go before it's simple. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3685 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- core/core.js | 8 +- core/ssb.js | 12 +-- src/ssb.c | 221 +++++++++++++++++++++++++++++++++++++-------------- src/ssb.h | 30 ++++--- src/ssb.js.c | 162 +++++++++++++++++++++---------------- 5 files changed, 284 insertions(+), 149 deletions(-) diff --git a/core/core.js b/core/core.js index 9f0e316a..2ff0b478 100644 --- a/core/core.js +++ b/core/core.js @@ -425,13 +425,13 @@ async function blobHandler(request, response, blobId, uri) { } } -ssb.onBroadcastsChanged = function() { +ssb.addEventListener('broadcasts', function() { broadcastEvent('onBroadcastsChanged', []); -} +}); -ssb.onConnectionsChanged = function() { +ssb.addEventListener('connections', function() { broadcastEvent('onConnectionsChanged', []); -} +}); async function loadSettings() { try { diff --git a/core/ssb.js b/core/ssb.js index acde2ede..d7aa8d45 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -62,7 +62,7 @@ function get_latest_sequence_for_author(author) { return sequence; } -ssb.registerConnectionsChanged(function(change, connection) { +ssb.addEventListener('connections', function(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, keys: false}]}, function(message) { @@ -108,21 +108,21 @@ ssb.registerConnectionsChanged(function(change, connection) { } }); -ssb.registerRpc(['blobs', 'createWants'], function(request) { +ssb.addRpc(['blobs', 'createWants'], function(request) { g_wants_requests[request.connection.id] = request; function blob_want_discovered(id) { var message = {}; message[id] = -1; request.send_json(message); } - ssb.registerBlobWantAdded(blob_want_discovered); + ssb.addEventListener('blob_want_added', blob_want_discovered); ssb.sqlStream( 'SELECT id FROM blob_wants', [], row => blob_want_discovered(row.id)); }); -ssb.registerRpc(['blobs', 'has'], function(request) { +ssb.addRpc(['blobs', 'has'], function(request) { var found = false; ssb.sqlStream( 'SELECT 1 FROM blobs where id = ?1', @@ -133,14 +133,14 @@ ssb.registerRpc(['blobs', 'has'], function(request) { request.send_json(found); }); -ssb.registerRpc(['blobs', 'get'], function(request) { +ssb.addRpc(['blobs', 'get'], function(request) { for (let id of request.args) { var blob = ssb.blobGet(id); request.send_binary(blob); } }); -ssb.registerRpc(['createHistoryStream'], function(request) { +ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; var keys = request.args[0].keys || request.args[0].keys === undefined; diff --git a/src/ssb.c b/src/ssb.c index 86865d92..075507e6 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -81,19 +81,35 @@ typedef struct _tf_ssb_rpc_callback_node_t tf_ssb_rpc_callback_node_t; typedef struct _tf_ssb_rpc_callback_node_t { const char** name; tf_ssb_rpc_callback_t* callback; - tf_ssb_rpc_cleanup_t* cleanup; + tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_rpc_callback_node_t* next; } tf_ssb_rpc_callback_node_t; +typedef struct _tf_ssb_connections_changed_callback_node_t tf_ssb_connections_changed_callback_node_t; +typedef struct _tf_ssb_connections_changed_callback_node_t { + tf_ssb_connections_changed_callback_t* callback; + tf_ssb_callback_cleanup_t* cleanup; + void* user_data; + tf_ssb_connections_changed_callback_node_t* next; +} tf_ssb_connections_changed_callback_node_t; + typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t; typedef struct _tf_ssb_blob_want_added_callback_node_t { - void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data); - void (*cleanup)(tf_ssb_t* ssb, void* user_data); + tf_ssb_blob_want_added_callback_t* callback; + tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_blob_want_added_callback_node_t* next; } tf_ssb_blob_want_added_callback_node_t; +typedef struct _tf_ssb_broadcasts_changed_callback_node_t tf_ssb_broadcasts_changed_callback_node_t; +typedef struct _tf_ssb_broadcasts_changed_callback_node_t { + tf_ssb_broadcasts_changed_callback_t* callback; + tf_ssb_callback_cleanup_t* cleanup; + void* user_data; + tf_ssb_broadcasts_changed_callback_node_t* next; +} tf_ssb_broadcasts_changed_callback_node_t; + typedef struct _tf_ssb_t { bool own_context; JSRuntime* runtime; @@ -116,22 +132,18 @@ typedef struct _tf_ssb_t { uint8_t pub[crypto_sign_PUBLICKEYBYTES]; uint8_t priv[crypto_sign_SECRETKEYBYTES]; - tf_ssb_connections_changed_callback_t* connections_changed[k_connections_changed_callbacks_max]; - tf_ssb_rpc_cleanup_t* connections_changed_cleanup[k_connections_changed_callbacks_max]; - void* connections_changed_user_data[k_connections_changed_callbacks_max]; int connections_changed_count; tf_ssb_connection_t* connections; tf_ssb_connections_t* connections_tracker; - void (*broadcasts_changed)(tf_ssb_t* ssb, void* user_data); - void* broadcasts_changed_user_data; tf_ssb_broadcast_t* broadcasts; tf_ssb_rpc_callback_node_t* rpc; - + tf_ssb_connections_changed_callback_node_t* connections_changed; tf_ssb_blob_want_added_callback_node_t* blob_want_added; + tf_ssb_broadcasts_changed_callback_node_t* broadcasts_changed; } tf_ssb_t; typedef struct _tf_ssb_connection_t { @@ -378,7 +390,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect return found; } -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) +static void _tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) { if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { @@ -395,7 +407,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ connection->requests = request; } -void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) +static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) { for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next) { @@ -417,7 +429,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, { if (request_number > 0) { - tf_ssb_connection_add_request(connection, request_number, callback, user_data); + _tf_ssb_connection_add_request(connection, request_number, callback, user_data); } uint8_t* combined = malloc(9 + size); *combined = flags; @@ -528,6 +540,14 @@ bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str) return base64c_decode((const uint8_t*)author_id, type - author_id, bin, crypto_box_PUBLICKEYBYTES) != 0; } +static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection) +{ + for (tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed; node; node = node->next) + { + node->callback(ssb, change, connection, node->user_data); + } +} + static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, const uint8_t* message, size_t len) { uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 }; @@ -629,10 +649,7 @@ static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); connection->state = k_tf_ssb_state_verified; - for (int i = 0; i < connection->ssb->connections_changed_count; i++) - { - connection->ssb->connections_changed[i](connection->ssb, k_tf_ssb_change_connect, connection, connection->ssb->connections_changed_user_data[i]); - } + _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); } const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) @@ -837,10 +854,7 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); connection->state = k_tf_ssb_state_server_verified; - for (int i = 0; i < connection->ssb->connections_changed_count; i++) - { - connection->ssb->connections_changed[i](connection->ssb, k_tf_ssb_change_connect, connection, connection->ssb->connections_changed_user_data[i]); - } + _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); } static bool _tf_ssb_connection_recv_pop(tf_ssb_connection_t* connection, uint8_t* buffer, size_t size) @@ -962,7 +976,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t if (request_number < 0 && (flags & k_ssb_rpc_flag_end_error)) { - tf_ssb_connection_remove_request(connection, -request_number); + _tf_ssb_connection_remove_request(connection, -request_number); } } @@ -1156,13 +1170,10 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle) } while (connection->requests) { - tf_ssb_connection_remove_request(connection, connection->requests->request_number); + _tf_ssb_connection_remove_request(connection, connection->requests->request_number); } - for (int i = 0; i < ssb->connections_changed_count; i++) - { - ssb->connections_changed[i](ssb, k_tf_ssb_change_remove, connection, ssb->connections_changed_user_data[i]); - } - JS_FreeValue(connection->ssb->context, connection->object); + _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection); + JS_FreeValue(ssb->context, connection->object); } static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) @@ -1546,14 +1557,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb) tf_ssb_connections_destroy(ssb->connections_tracker); ssb->connections_tracker = NULL; - for (int i = 0; i < ssb->connections_changed_count; i++) - { - if (ssb->connections_changed_cleanup[i]) - { - ssb->connections_changed_cleanup[i](ssb, ssb->connections_changed_user_data[i]); - } - } - if (ssb->broadcast_listener.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_listener)) { uv_close((uv_handle_t*)&ssb->broadcast_listener, _tf_ssb_on_handle_close); @@ -1597,6 +1600,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb) } free(node); } + while (ssb->connections_changed) + { + tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed; + ssb->connections_changed = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } while (ssb->blob_want_added) { tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; @@ -1607,6 +1620,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb) } free(node); } + while (ssb->broadcasts_changed) + { + tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed; + ssb->broadcasts_changed = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } if (ssb->own_context) { JS_FreeContext(ssb->context); @@ -1702,10 +1725,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c connection->next = ssb->connections; ssb->connections = connection; - for (int i = 0; i < ssb->connections_changed_count; i++) - { - ssb->connections_changed[i](ssb, k_tf_ssb_change_create, connection, ssb->connections_changed_user_data[i]); - } + _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); return connection; } @@ -1800,10 +1820,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) connection->next = ssb->connections; ssb->connections = connection; - for (int i = 0; i < ssb->connections_changed_count; i++) - { - ssb->connections_changed[i](ssb, k_tf_ssb_change_create, connection, ssb->connections_changed_user_data[i]); - } + _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); connection->state = k_tf_ssb_state_server_wait_hello; uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); @@ -1960,6 +1977,17 @@ static void _tf_ssb_on_broadcast_listener_alloc(uv_handle_t* handle, size_t sugg buf->len = suggested_size; } +static void _tf_ssb_notify_broadcasts_changed(tf_ssb_t* ssb) +{ + for (tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed; node; node = node->next) + { + if (node->callback) + { + node->callback(ssb, node->user_data); + } + } +} + static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broadcast) { if (memcmp(broadcast->pub, ssb->pub, sizeof(ssb->pub)) == 0) @@ -1992,10 +2020,7 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad node->mtime = node->ctime; ssb->broadcasts = node; - if (ssb->broadcasts_changed) - { - ssb->broadcasts_changed(ssb, ssb->broadcasts_changed_user_data); - } + _tf_ssb_notify_broadcasts_changed(ssb); } static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) @@ -2118,22 +2143,79 @@ int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, return i; } -void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data) +void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { - ssb->broadcasts_changed = callback; - ssb->broadcasts_changed_user_data = user_data; + tf_ssb_broadcasts_changed_callback_node_t* node = malloc(sizeof(tf_ssb_broadcasts_changed_callback_node_t)); + *node = (tf_ssb_broadcasts_changed_callback_node_t) + { + .callback = callback, + .cleanup = cleanup, + .user_data = user_data, + .next = ssb->broadcasts_changed, + }; + ssb->broadcasts_changed = node; } -void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) +void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data) { - assert(ssb->connections_changed_count < k_connections_changed_callbacks_max); - ssb->connections_changed[ssb->connections_changed_count] = callback; - ssb->connections_changed_cleanup[ssb->connections_changed_count] = cleanup; - ssb->connections_changed_user_data[ssb->connections_changed_count] = user_data; - ssb->connections_changed_count++; + tf_ssb_broadcasts_changed_callback_node_t** it = &ssb->broadcasts_changed; + while (*it) + { + if ((*it)->callback == callback && + (*it)->user_data == user_data) + { + tf_ssb_broadcasts_changed_callback_node_t* node = *it; + *it = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } + else + { + *it = (*it)->next; + } + } } -void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) +void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) +{ + tf_ssb_connections_changed_callback_node_t* node = malloc(sizeof(tf_ssb_connections_changed_callback_node_t)); + *node = (tf_ssb_connections_changed_callback_node_t) + { + .callback = callback, + .cleanup = cleanup, + .user_data = user_data, + .next = ssb->connections_changed, + }; + ssb->connections_changed = node; +} + +void tf_ssb_remove_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data) +{ + tf_ssb_connections_changed_callback_node_t** it = &ssb->connections_changed; + while (*it) + { + if ((*it)->callback == callback && + (*it)->user_data == user_data) + { + tf_ssb_connections_changed_callback_node_t* node = *it; + *it = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } + else + { + *it = (*it)->next; + } + } +} + +void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { size_t name_len = 0; int name_count = 0; @@ -2188,7 +2270,7 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) return connection ? connection->object : JS_UNDEFINED; } -void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) +void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) { tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t)); *node = (tf_ssb_blob_want_added_callback_node_t) @@ -2201,6 +2283,29 @@ void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* s ssb->blob_want_added = node; } +void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data) +{ + tf_ssb_blob_want_added_callback_node_t** it = &ssb->blob_want_added; + while (*it) + { + if ((*it)->callback == callback && + (*it)->user_data == user_data) + { + tf_ssb_blob_want_added_callback_node_t* node = *it; + *it = node->next; + if (node->cleanup) + { + node->cleanup(ssb, node->user_data); + } + free(node); + } + else + { + *it = (*it)->next; + } + } +} + void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id) { for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = node->next) diff --git a/src/ssb.h b/src/ssb.h index ab7b6377..6801f988 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -58,12 +58,8 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message); void tf_ssb_append_post(tf_ssb_t* ssb, const char* text); bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size); -void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data); void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data); -typedef void (tf_ssb_rpc_cleanup_t)(tf_ssb_t* ssb, void* user_data); -typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data); -void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data); const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb); int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count); void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key); @@ -76,9 +72,6 @@ void tf_ssb_send_close(tf_ssb_t* ssb); bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str); bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin); -typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); -void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data); - bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size); void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size); @@ -87,16 +80,31 @@ int tf_ssb_connection_get_port(tf_ssb_connection_t* connection); tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection); JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection); sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection); -void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); -void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection); -void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data); +/* Callbacks. */ +typedef void (tf_ssb_callback_cleanup_t)(tf_ssb_t* ssb, void* user_data); +typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data); +void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_remove_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data); + +typedef void (tf_ssb_broadcasts_changed_callback_t)(tf_ssb_t* ssb, void* user_data); +void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data); + +typedef void (tf_ssb_blob_want_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data); +void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data); void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id); +typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); +void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data); + +void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data); + JSClassID tf_ssb_get_connection_class_id(); diff --git a/src/ssb.js.c b/src/ssb.js.c index 6b0c7eae..1ed8a9cb 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -335,37 +335,6 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar return JS_UNDEFINED; } -static void _tf_ssb_call_callback(tf_ssb_t* ssb, const char* name, void* user_data) -{ - JSContext* context = tf_ssb_get_context(ssb); - JSValue global = JS_GetGlobalObject(context); - JSValue ssbo = JS_GetPropertyStr(context, global, "ssb"); - JSValue callback = JS_GetPropertyStr(context, ssbo, name); - if (JS_IsFunction(context, callback)) - { - JSValue args = JS_UNDEFINED; - JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args); - tf_util_report_error(context, response); - if (tf_task_get(context)) - { - tf_task_run_jobs(tf_task_get(context)); - } - JS_FreeValue(context, response); - } - JS_FreeValue(context, ssbo); - JS_FreeValue(context, global); -} - -static void _tf_ssb_broadcasts_changed(tf_ssb_t* ssb, void* user_data) -{ - _tf_ssb_call_callback(ssb, "onBroadcastsChanged", user_data); -} - -static void _tf_ssb_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) -{ - _tf_ssb_call_callback(ssb, "onConnectionsChanged", user_data); -} - static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); @@ -440,13 +409,13 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ JS_FreeValue(context, object); } -static void _tf_ssb_rpc_js_value_cleanup(tf_ssb_t* ssb, void* user_data) +static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) { JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); JS_FreeValue(tf_ssb_get_context(ssb), callback); } -static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); if (!JS_IsArray(context, argv[0])) @@ -477,7 +446,7 @@ static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, i JS_FreeValue(context, value); } - tf_ssb_register_rpc(ssb, name, _tf_ssb_on_rpc, _tf_ssb_rpc_js_value_cleanup, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1]))); + tf_ssb_add_rpc_callback(ssb, name, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1]))); for (int i = 0; i < length; i++) { @@ -487,7 +456,7 @@ static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, i return JS_UNDEFINED; } -static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user_data) +static void _tf_ssb_on_blob_want_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); @@ -498,24 +467,7 @@ static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user JS_FreeValue(context, string); } -static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) -{ - JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); - JS_FreeValue(tf_ssb_get_context(ssb), callback); -} - -static JSValue _tf_ssb_register_blob_want_added(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) -{ - tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); - if (!JS_IsFunction(context, argv[0])) - { - return JS_ThrowTypeError(context, "Expected argument 1 to be a function."); - } - tf_ssb_register_blob_want_added(ssb, _tf_ssb_on_blob_want_added, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]))); - return JS_UNDEFINED; -} - -static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) +static void _tf_ssb_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); @@ -557,18 +509,13 @@ static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_ch JS_FreeValue(context, response); } -static JSValue _tf_ssb_register_connections_changed(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +static void _tf_ssb_on_broadcasts_changed_callback(tf_ssb_t* ssb, void* user_data) { - printf("register connections changed\n"); - tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); - if (!JS_IsFunction(context, argv[0])) - { - return JS_ThrowTypeError(context, "Expected argument 1 to be a function."); - } - void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])); - printf("registering %p TAG=%d\n", ptr, JS_VALUE_GET_TAG(argv[0])); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed_callback, _tf_ssb_rpc_js_value_cleanup, ptr); - return JS_UNDEFINED; + JSContext* context = tf_ssb_get_context(ssb); + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &JS_UNDEFINED); + tf_util_report_error(context, response); + JS_FreeValue(context, response); } void tf_ssb_run_file(JSContext* context, const char* file_name) @@ -632,6 +579,82 @@ void tf_ssb_run_file(JSContext* context, const char* file_name) free(source); } +static JSValue _tf_ssb_add_event_listener(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + const char* event_name = JS_ToCString(context, argv[0]); + JSValue callback = argv[1]; + JSValue result = JS_UNDEFINED; + + if (!event_name) + { + result = JS_ThrowTypeError(context, "Expected argument 1 to be a string event name."); + } + else if (!JS_IsFunction(context, callback)) + { + result = JS_ThrowTypeError(context, "Expected argument 2 to be a function."); + } + else + { + if (strcmp(event_name, "connections") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, _tf_ssb_cleanup_value, ptr); + } + else if (strcmp(event_name, "broadcasts") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, _tf_ssb_cleanup_value, ptr); + } + else if (strcmp(event_name, "blob_want_added") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_on_blob_want_added_callback, _tf_ssb_cleanup_value, ptr); + } + } + + JS_FreeCString(context, event_name); + return result; +} + +static JSValue _tf_ssb_remove_event_listener(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + const char* event_name = JS_ToCString(context, argv[0]); + JSValue callback = argv[1]; + JSValue result = JS_UNDEFINED; + + if (!event_name) + { + result = JS_ThrowTypeError(context, "Expected argument 1 to be a string event name."); + } + else if (!JS_IsFunction(context, callback)) + { + result = JS_ThrowTypeError(context, "Expected argument 2 to be a function."); + } + else + { + if (strcmp(event_name, "connections") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_remove_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, ptr); + } + else if (strcmp(event_name, "broadcasts") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_remove_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, ptr); + } + else if (strcmp(event_name, "blob_want_added") == 0) + { + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback)); + tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_on_blob_want_added_callback, ptr); + } + } + + JS_FreeCString(context, event_name); + return result; +} + void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) { JS_NewClassID(&_tf_ssb_classId); @@ -644,9 +667,6 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) fprintf(stderr, "Failed to register ssb.\n"); } - tf_ssb_set_broadcasts_changed_callback(ssb, _tf_ssb_broadcasts_changed, NULL); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL, NULL); - JSValue global = JS_GetGlobalObject(context); JSValue object = JS_NewObjectClass(context, _tf_ssb_classId); JS_SetPropertyStr(context, global, "ssb", object); @@ -663,9 +683,11 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); - JS_SetPropertyStr(context, object, "registerRpc", JS_NewCFunction(context, _tf_ssb_register_rpc, "registerRpc", 2)); - JS_SetPropertyStr(context, object, "registerBlobWantAdded", JS_NewCFunction(context, _tf_ssb_register_blob_want_added, "registerBlobWantAdded", 1)); - JS_SetPropertyStr(context, object, "registerConnectionsChanged", JS_NewCFunction(context, _tf_ssb_register_connections_changed, "registerConnectionsChanged", 1)); + + JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2)); + + JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); + JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2)); JS_FreeValue(context, global);