Trying to normalize event handling somewhat. More to go before it's simple.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3685 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2021-11-07 22:28:58 +00:00
parent 68cf3efcde
commit 18c90214a8
5 changed files with 284 additions and 149 deletions

View File

@ -425,13 +425,13 @@ async function blobHandler(request, response, blobId, uri) {
} }
} }
ssb.onBroadcastsChanged = function() { ssb.addEventListener('broadcasts', function() {
broadcastEvent('onBroadcastsChanged', []); broadcastEvent('onBroadcastsChanged', []);
} });
ssb.onConnectionsChanged = function() { ssb.addEventListener('connections', function() {
broadcastEvent('onConnectionsChanged', []); broadcastEvent('onConnectionsChanged', []);
} });
async function loadSettings() { async function loadSettings() {
try { try {

View File

@ -62,7 +62,7 @@ function get_latest_sequence_for_author(author) {
return sequence; return sequence;
} }
ssb.registerConnectionsChanged(function(change, connection) { ssb.addEventListener('connections', function(change, connection) {
if (change == 'add') { if (change == 'add') {
var sequence = get_latest_sequence_for_author(connection.id); var sequence = get_latest_sequence_for_author(connection.id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, keys: false}]}, function(message) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, keys: false}]}, function(message) {
@ -108,21 +108,21 @@ ssb.registerConnectionsChanged(function(change, connection) {
} }
}); });
ssb.registerRpc(['blobs', 'createWants'], function(request) { ssb.addRpc(['blobs', 'createWants'], function(request) {
g_wants_requests[request.connection.id] = request; g_wants_requests[request.connection.id] = request;
function blob_want_discovered(id) { function blob_want_discovered(id) {
var message = {}; var message = {};
message[id] = -1; message[id] = -1;
request.send_json(message); request.send_json(message);
} }
ssb.registerBlobWantAdded(blob_want_discovered); ssb.addEventListener('blob_want_added', blob_want_discovered);
ssb.sqlStream( ssb.sqlStream(
'SELECT id FROM blob_wants', 'SELECT id FROM blob_wants',
[], [],
row => blob_want_discovered(row.id)); row => blob_want_discovered(row.id));
}); });
ssb.registerRpc(['blobs', 'has'], function(request) { ssb.addRpc(['blobs', 'has'], function(request) {
var found = false; var found = false;
ssb.sqlStream( ssb.sqlStream(
'SELECT 1 FROM blobs where id = ?1', 'SELECT 1 FROM blobs where id = ?1',
@ -133,14 +133,14 @@ ssb.registerRpc(['blobs', 'has'], function(request) {
request.send_json(found); request.send_json(found);
}); });
ssb.registerRpc(['blobs', 'get'], function(request) { ssb.addRpc(['blobs', 'get'], function(request) {
for (let id of request.args) { for (let id of request.args) {
var blob = ssb.blobGet(id); var blob = ssb.blobGet(id);
request.send_binary(blob); request.send_binary(blob);
} }
}); });
ssb.registerRpc(['createHistoryStream'], function(request) { ssb.addRpc(['createHistoryStream'], function(request) {
var id = request.args[0].id; var id = request.args[0].id;
var seq = request.args[0].seq; var seq = request.args[0].seq;
var keys = request.args[0].keys || request.args[0].keys === undefined; var keys = request.args[0].keys || request.args[0].keys === undefined;

221
src/ssb.c
View File

@ -81,19 +81,35 @@ typedef struct _tf_ssb_rpc_callback_node_t tf_ssb_rpc_callback_node_t;
typedef struct _tf_ssb_rpc_callback_node_t { typedef struct _tf_ssb_rpc_callback_node_t {
const char** name; const char** name;
tf_ssb_rpc_callback_t* callback; tf_ssb_rpc_callback_t* callback;
tf_ssb_rpc_cleanup_t* cleanup; tf_ssb_callback_cleanup_t* cleanup;
void* user_data; void* user_data;
tf_ssb_rpc_callback_node_t* next; tf_ssb_rpc_callback_node_t* next;
} tf_ssb_rpc_callback_node_t; } tf_ssb_rpc_callback_node_t;
typedef struct _tf_ssb_connections_changed_callback_node_t tf_ssb_connections_changed_callback_node_t;
typedef struct _tf_ssb_connections_changed_callback_node_t {
tf_ssb_connections_changed_callback_t* callback;
tf_ssb_callback_cleanup_t* cleanup;
void* user_data;
tf_ssb_connections_changed_callback_node_t* next;
} tf_ssb_connections_changed_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t; typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t { typedef struct _tf_ssb_blob_want_added_callback_node_t {
void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data); tf_ssb_blob_want_added_callback_t* callback;
void (*cleanup)(tf_ssb_t* ssb, void* user_data); tf_ssb_callback_cleanup_t* cleanup;
void* user_data; void* user_data;
tf_ssb_blob_want_added_callback_node_t* next; tf_ssb_blob_want_added_callback_node_t* next;
} tf_ssb_blob_want_added_callback_node_t; } tf_ssb_blob_want_added_callback_node_t;
typedef struct _tf_ssb_broadcasts_changed_callback_node_t tf_ssb_broadcasts_changed_callback_node_t;
typedef struct _tf_ssb_broadcasts_changed_callback_node_t {
tf_ssb_broadcasts_changed_callback_t* callback;
tf_ssb_callback_cleanup_t* cleanup;
void* user_data;
tf_ssb_broadcasts_changed_callback_node_t* next;
} tf_ssb_broadcasts_changed_callback_node_t;
typedef struct _tf_ssb_t { typedef struct _tf_ssb_t {
bool own_context; bool own_context;
JSRuntime* runtime; JSRuntime* runtime;
@ -116,22 +132,18 @@ typedef struct _tf_ssb_t {
uint8_t pub[crypto_sign_PUBLICKEYBYTES]; uint8_t pub[crypto_sign_PUBLICKEYBYTES];
uint8_t priv[crypto_sign_SECRETKEYBYTES]; uint8_t priv[crypto_sign_SECRETKEYBYTES];
tf_ssb_connections_changed_callback_t* connections_changed[k_connections_changed_callbacks_max];
tf_ssb_rpc_cleanup_t* connections_changed_cleanup[k_connections_changed_callbacks_max];
void* connections_changed_user_data[k_connections_changed_callbacks_max];
int connections_changed_count; int connections_changed_count;
tf_ssb_connection_t* connections; tf_ssb_connection_t* connections;
tf_ssb_connections_t* connections_tracker; tf_ssb_connections_t* connections_tracker;
void (*broadcasts_changed)(tf_ssb_t* ssb, void* user_data);
void* broadcasts_changed_user_data;
tf_ssb_broadcast_t* broadcasts; tf_ssb_broadcast_t* broadcasts;
tf_ssb_rpc_callback_node_t* rpc; tf_ssb_rpc_callback_node_t* rpc;
tf_ssb_connections_changed_callback_node_t* connections_changed;
tf_ssb_blob_want_added_callback_node_t* blob_want_added; tf_ssb_blob_want_added_callback_node_t* blob_want_added;
tf_ssb_broadcasts_changed_callback_node_t* broadcasts_changed;
} tf_ssb_t; } tf_ssb_t;
typedef struct _tf_ssb_connection_t { typedef struct _tf_ssb_connection_t {
@ -378,7 +390,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect
return found; return found;
} }
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) static void _tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data)
{ {
if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{ {
@ -395,7 +407,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ
connection->requests = request; connection->requests = request;
} }
void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number)
{ {
for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next) for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next)
{ {
@ -417,7 +429,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
{ {
if (request_number > 0) if (request_number > 0)
{ {
tf_ssb_connection_add_request(connection, request_number, callback, user_data); _tf_ssb_connection_add_request(connection, request_number, callback, user_data);
} }
uint8_t* combined = malloc(9 + size); uint8_t* combined = malloc(9 + size);
*combined = flags; *combined = flags;
@ -528,6 +540,14 @@ bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str)
return base64c_decode((const uint8_t*)author_id, type - author_id, bin, crypto_box_PUBLICKEYBYTES) != 0; return base64c_decode((const uint8_t*)author_id, type - author_id, bin, crypto_box_PUBLICKEYBYTES) != 0;
} }
static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection)
{
for (tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed; node; node = node->next)
{
node->callback(ssb, change, connection, node->user_data);
}
}
static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, const uint8_t* message, size_t len) static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, const uint8_t* message, size_t len)
{ {
uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 }; uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 };
@ -629,10 +649,7 @@ static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection,
JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid));
connection->state = k_tf_ssb_state_verified; connection->state = k_tf_ssb_state_verified;
for (int i = 0; i < connection->ssb->connections_changed_count; i++) _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection);
{
connection->ssb->connections_changed[i](connection->ssb, k_tf_ssb_change_connect, connection, connection->ssb->connections_changed_user_data[i]);
}
} }
const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection)
@ -837,10 +854,7 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne
JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid));
connection->state = k_tf_ssb_state_server_verified; connection->state = k_tf_ssb_state_server_verified;
for (int i = 0; i < connection->ssb->connections_changed_count; i++) _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection);
{
connection->ssb->connections_changed[i](connection->ssb, k_tf_ssb_change_connect, connection, connection->ssb->connections_changed_user_data[i]);
}
} }
static bool _tf_ssb_connection_recv_pop(tf_ssb_connection_t* connection, uint8_t* buffer, size_t size) static bool _tf_ssb_connection_recv_pop(tf_ssb_connection_t* connection, uint8_t* buffer, size_t size)
@ -962,7 +976,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
if (request_number < 0 && if (request_number < 0 &&
(flags & k_ssb_rpc_flag_end_error)) (flags & k_ssb_rpc_flag_end_error))
{ {
tf_ssb_connection_remove_request(connection, -request_number); _tf_ssb_connection_remove_request(connection, -request_number);
} }
} }
@ -1156,13 +1170,10 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
} }
while (connection->requests) while (connection->requests)
{ {
tf_ssb_connection_remove_request(connection, connection->requests->request_number); _tf_ssb_connection_remove_request(connection, connection->requests->request_number);
} }
for (int i = 0; i < ssb->connections_changed_count; i++) _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection);
{ JS_FreeValue(ssb->context, connection->object);
ssb->connections_changed[i](ssb, k_tf_ssb_change_remove, connection, ssb->connections_changed_user_data[i]);
}
JS_FreeValue(connection->ssb->context, connection->object);
} }
static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
@ -1546,14 +1557,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
tf_ssb_connections_destroy(ssb->connections_tracker); tf_ssb_connections_destroy(ssb->connections_tracker);
ssb->connections_tracker = NULL; ssb->connections_tracker = NULL;
for (int i = 0; i < ssb->connections_changed_count; i++)
{
if (ssb->connections_changed_cleanup[i])
{
ssb->connections_changed_cleanup[i](ssb, ssb->connections_changed_user_data[i]);
}
}
if (ssb->broadcast_listener.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_listener)) if (ssb->broadcast_listener.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_listener))
{ {
uv_close((uv_handle_t*)&ssb->broadcast_listener, _tf_ssb_on_handle_close); uv_close((uv_handle_t*)&ssb->broadcast_listener, _tf_ssb_on_handle_close);
@ -1597,6 +1600,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
} }
free(node); free(node);
} }
while (ssb->connections_changed)
{
tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed;
ssb->connections_changed = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
while (ssb->blob_want_added) while (ssb->blob_want_added)
{ {
tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added;
@ -1607,6 +1620,16 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
} }
free(node); free(node);
} }
while (ssb->broadcasts_changed)
{
tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed;
ssb->broadcasts_changed = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
if (ssb->own_context) if (ssb->own_context)
{ {
JS_FreeContext(ssb->context); JS_FreeContext(ssb->context);
@ -1702,10 +1725,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
connection->next = ssb->connections; connection->next = ssb->connections;
ssb->connections = connection; ssb->connections = connection;
for (int i = 0; i < ssb->connections_changed_count; i++) _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
{
ssb->connections_changed[i](ssb, k_tf_ssb_change_create, connection, ssb->connections_changed_user_data[i]);
}
return connection; return connection;
} }
@ -1800,10 +1820,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
connection->next = ssb->connections; connection->next = ssb->connections;
ssb->connections = connection; ssb->connections = connection;
for (int i = 0; i < ssb->connections_changed_count; i++) _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
{
ssb->connections_changed[i](ssb, k_tf_ssb_change_create, connection, ssb->connections_changed_user_data[i]);
}
connection->state = k_tf_ssb_state_server_wait_hello; connection->state = k_tf_ssb_state_server_wait_hello;
uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv);
@ -1960,6 +1977,17 @@ static void _tf_ssb_on_broadcast_listener_alloc(uv_handle_t* handle, size_t sugg
buf->len = suggested_size; buf->len = suggested_size;
} }
static void _tf_ssb_notify_broadcasts_changed(tf_ssb_t* ssb)
{
for (tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed; node; node = node->next)
{
if (node->callback)
{
node->callback(ssb, node->user_data);
}
}
}
static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broadcast) static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broadcast)
{ {
if (memcmp(broadcast->pub, ssb->pub, sizeof(ssb->pub)) == 0) if (memcmp(broadcast->pub, ssb->pub, sizeof(ssb->pub)) == 0)
@ -1992,10 +2020,7 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad
node->mtime = node->ctime; node->mtime = node->ctime;
ssb->broadcasts = node; ssb->broadcasts = node;
if (ssb->broadcasts_changed) _tf_ssb_notify_broadcasts_changed(ssb);
{
ssb->broadcasts_changed(ssb, ssb->broadcasts_changed_user_data);
}
} }
static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags)
@ -2118,22 +2143,79 @@ int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections,
return i; return i;
} }
void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data) void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{ {
ssb->broadcasts_changed = callback; tf_ssb_broadcasts_changed_callback_node_t* node = malloc(sizeof(tf_ssb_broadcasts_changed_callback_node_t));
ssb->broadcasts_changed_user_data = user_data; *node = (tf_ssb_broadcasts_changed_callback_node_t)
{
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.next = ssb->broadcasts_changed,
};
ssb->broadcasts_changed = node;
} }
void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data)
{ {
assert(ssb->connections_changed_count < k_connections_changed_callbacks_max); tf_ssb_broadcasts_changed_callback_node_t** it = &ssb->broadcasts_changed;
ssb->connections_changed[ssb->connections_changed_count] = callback; while (*it)
ssb->connections_changed_cleanup[ssb->connections_changed_count] = cleanup; {
ssb->connections_changed_user_data[ssb->connections_changed_count] = user_data; if ((*it)->callback == callback &&
ssb->connections_changed_count++; (*it)->user_data == user_data)
{
tf_ssb_broadcasts_changed_callback_node_t* node = *it;
*it = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
else
{
*it = (*it)->next;
}
}
} }
void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{
tf_ssb_connections_changed_callback_node_t* node = malloc(sizeof(tf_ssb_connections_changed_callback_node_t));
*node = (tf_ssb_connections_changed_callback_node_t)
{
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.next = ssb->connections_changed,
};
ssb->connections_changed = node;
}
void tf_ssb_remove_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data)
{
tf_ssb_connections_changed_callback_node_t** it = &ssb->connections_changed;
while (*it)
{
if ((*it)->callback == callback &&
(*it)->user_data == user_data)
{
tf_ssb_connections_changed_callback_node_t* node = *it;
*it = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
else
{
*it = (*it)->next;
}
}
}
void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{ {
size_t name_len = 0; size_t name_len = 0;
int name_count = 0; int name_count = 0;
@ -2188,7 +2270,7 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection)
return connection ? connection->object : JS_UNDEFINED; return connection ? connection->object : JS_UNDEFINED;
} }
void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
{ {
tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t)); tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t));
*node = (tf_ssb_blob_want_added_callback_node_t) *node = (tf_ssb_blob_want_added_callback_node_t)
@ -2201,6 +2283,29 @@ void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* s
ssb->blob_want_added = node; ssb->blob_want_added = node;
} }
void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data)
{
tf_ssb_blob_want_added_callback_node_t** it = &ssb->blob_want_added;
while (*it)
{
if ((*it)->callback == callback &&
(*it)->user_data == user_data)
{
tf_ssb_blob_want_added_callback_node_t* node = *it;
*it = node->next;
if (node->cleanup)
{
node->cleanup(ssb, node->user_data);
}
free(node);
}
else
{
*it = (*it)->next;
}
}
}
void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id) void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id)
{ {
for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = node->next) for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = node->next)

View File

@ -58,12 +58,8 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message);
void tf_ssb_append_post(tf_ssb_t* ssb, const char* text); void tf_ssb_append_post(tf_ssb_t* ssb, const char* text);
bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size); bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size);
void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data);
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data); void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data);
typedef void (tf_ssb_rpc_cleanup_t)(tf_ssb_t* ssb, void* user_data);
typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data);
void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data);
const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb); const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb);
int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count); int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count);
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key); void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key);
@ -76,9 +72,6 @@ void tf_ssb_send_close(tf_ssb_t* ssb);
bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str); bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str);
bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin); bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin);
typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);
void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data);
bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size); bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size);
void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size); void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size);
@ -87,16 +80,31 @@ int tf_ssb_connection_get_port(tf_ssb_connection_t* connection);
tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection); tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection);
JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection); JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection);
sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection); sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection);
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data);
int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection);
bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number);
JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection); JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection);
void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data); /* Callbacks. */
typedef void (tf_ssb_callback_cleanup_t)(tf_ssb_t* ssb, void* user_data);
typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data);
void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data);
typedef void (tf_ssb_broadcasts_changed_callback_t)(tf_ssb_t* ssb, void* user_data);
void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data);
typedef void (tf_ssb_blob_want_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data);
void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data);
void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id); void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id);
typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);
void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data);
JSClassID tf_ssb_get_connection_class_id(); JSClassID tf_ssb_get_connection_class_id();

View File

@ -335,37 +335,6 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar
return JS_UNDEFINED; return JS_UNDEFINED;
} }
static void _tf_ssb_call_callback(tf_ssb_t* ssb, const char* name, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue global = JS_GetGlobalObject(context);
JSValue ssbo = JS_GetPropertyStr(context, global, "ssb");
JSValue callback = JS_GetPropertyStr(context, ssbo, name);
if (JS_IsFunction(context, callback))
{
JSValue args = JS_UNDEFINED;
JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args);
tf_util_report_error(context, response);
if (tf_task_get(context))
{
tf_task_run_jobs(tf_task_get(context));
}
JS_FreeValue(context, response);
}
JS_FreeValue(context, ssbo);
JS_FreeValue(context, global);
}
static void _tf_ssb_broadcasts_changed(tf_ssb_t* ssb, void* user_data)
{
_tf_ssb_call_callback(ssb, "onBroadcastsChanged", user_data);
}
static void _tf_ssb_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
_tf_ssb_call_callback(ssb, "onConnectionsChanged", user_data);
}
static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{ {
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
@ -440,13 +409,13 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ
JS_FreeValue(context, object); JS_FreeValue(context, object);
} }
static void _tf_ssb_rpc_js_value_cleanup(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
{ {
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback); JS_FreeValue(tf_ssb_get_context(ssb), callback);
} }
static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{ {
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsArray(context, argv[0])) if (!JS_IsArray(context, argv[0]))
@ -477,7 +446,7 @@ static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, i
JS_FreeValue(context, value); JS_FreeValue(context, value);
} }
tf_ssb_register_rpc(ssb, name, _tf_ssb_on_rpc, _tf_ssb_rpc_js_value_cleanup, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1]))); tf_ssb_add_rpc_callback(ssb, name, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])));
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
@ -487,7 +456,7 @@ static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, i
return JS_UNDEFINED; return JS_UNDEFINED;
} }
static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user_data) static void _tf_ssb_on_blob_want_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{ {
JSContext* context = tf_ssb_get_context(ssb); JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
@ -498,24 +467,7 @@ static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user
JS_FreeValue(context, string); JS_FreeValue(context, string);
} }
static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) static void _tf_ssb_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_register_blob_want_added(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsFunction(context, argv[0]))
{
return JS_ThrowTypeError(context, "Expected argument 1 to be a function.");
}
tf_ssb_register_blob_want_added(ssb, _tf_ssb_on_blob_want_added, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])));
return JS_UNDEFINED;
}
static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{ {
JSContext* context = tf_ssb_get_context(ssb); JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
@ -557,18 +509,13 @@ static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_ch
JS_FreeValue(context, response); JS_FreeValue(context, response);
} }
static JSValue _tf_ssb_register_connections_changed(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static void _tf_ssb_on_broadcasts_changed_callback(tf_ssb_t* ssb, void* user_data)
{ {
printf("register connections changed\n"); JSContext* context = tf_ssb_get_context(ssb);
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
if (!JS_IsFunction(context, argv[0])) JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &JS_UNDEFINED);
{ tf_util_report_error(context, response);
return JS_ThrowTypeError(context, "Expected argument 1 to be a function."); JS_FreeValue(context, response);
}
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]));
printf("registering %p TAG=%d\n", ptr, JS_VALUE_GET_TAG(argv[0]));
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed_callback, _tf_ssb_rpc_js_value_cleanup, ptr);
return JS_UNDEFINED;
} }
void tf_ssb_run_file(JSContext* context, const char* file_name) void tf_ssb_run_file(JSContext* context, const char* file_name)
@ -632,6 +579,82 @@ void tf_ssb_run_file(JSContext* context, const char* file_name)
free(source); free(source);
} }
static JSValue _tf_ssb_add_event_listener(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* event_name = JS_ToCString(context, argv[0]);
JSValue callback = argv[1];
JSValue result = JS_UNDEFINED;
if (!event_name)
{
result = JS_ThrowTypeError(context, "Expected argument 1 to be a string event name.");
}
else if (!JS_IsFunction(context, callback))
{
result = JS_ThrowTypeError(context, "Expected argument 2 to be a function.");
}
else
{
if (strcmp(event_name, "connections") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, _tf_ssb_cleanup_value, ptr);
}
else if (strcmp(event_name, "broadcasts") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, _tf_ssb_cleanup_value, ptr);
}
else if (strcmp(event_name, "blob_want_added") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_on_blob_want_added_callback, _tf_ssb_cleanup_value, ptr);
}
}
JS_FreeCString(context, event_name);
return result;
}
static JSValue _tf_ssb_remove_event_listener(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* event_name = JS_ToCString(context, argv[0]);
JSValue callback = argv[1];
JSValue result = JS_UNDEFINED;
if (!event_name)
{
result = JS_ThrowTypeError(context, "Expected argument 1 to be a string event name.");
}
else if (!JS_IsFunction(context, callback))
{
result = JS_ThrowTypeError(context, "Expected argument 2 to be a function.");
}
else
{
if (strcmp(event_name, "connections") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_remove_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, ptr);
}
else if (strcmp(event_name, "broadcasts") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_remove_broadcasts_changed_callback(ssb, _tf_ssb_on_broadcasts_changed_callback, ptr);
}
else if (strcmp(event_name, "blob_want_added") == 0)
{
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, callback));
tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_on_blob_want_added_callback, ptr);
}
}
JS_FreeCString(context, event_name);
return result;
}
void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
{ {
JS_NewClassID(&_tf_ssb_classId); JS_NewClassID(&_tf_ssb_classId);
@ -644,9 +667,6 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
fprintf(stderr, "Failed to register ssb.\n"); fprintf(stderr, "Failed to register ssb.\n");
} }
tf_ssb_set_broadcasts_changed_callback(ssb, _tf_ssb_broadcasts_changed, NULL);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL, NULL);
JSValue global = JS_GetGlobalObject(context); JSValue global = JS_GetGlobalObject(context);
JSValue object = JS_NewObjectClass(context, _tf_ssb_classId); JSValue object = JS_NewObjectClass(context, _tf_ssb_classId);
JS_SetPropertyStr(context, global, "ssb", object); JS_SetPropertyStr(context, global, "ssb", object);
@ -663,9 +683,11 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0));
JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1));
JS_SetPropertyStr(context, object, "registerRpc", JS_NewCFunction(context, _tf_ssb_register_rpc, "registerRpc", 2));
JS_SetPropertyStr(context, object, "registerBlobWantAdded", JS_NewCFunction(context, _tf_ssb_register_blob_want_added, "registerBlobWantAdded", 1)); JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2));
JS_SetPropertyStr(context, object, "registerConnectionsChanged", JS_NewCFunction(context, _tf_ssb_register_connections_changed, "registerConnectionsChanged", 1));
JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2));
JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2));
JS_FreeValue(context, global); JS_FreeValue(context, global);