diff --git a/core/ssb.js b/core/ssb.js new file mode 100644 index 00000000..9e626446 --- /dev/null +++ b/core/ssb.js @@ -0,0 +1,85 @@ +var g_wants_requests = {}; + +ssb.registerConnectionsChanged(function(change, connection) { + if (change == 'add') { + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': 0}]}, function(message) { + ssb.storeMessage(message.message); + }); + connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { + Object.keys(message.message).forEach(function(id) { + if (message.message[id] < 0) { + var blob = ssb.blobGet(id); + if (blob) { + var out_message = {}; + out_message[id] = blob.byteLength; + g_wants_requests[connection.id].send_json(out_message); + //connection.wants_request.send_json(out_message); + } + } else { + debug_print("blobs.get", id); + connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [{'id': id}]}, function(message) { + debug_print(id, '=>', debug_utf8Decode(message.message)); + ssb.blobStore(message.message); + }); + } + }); + }); + } else if (change == 'remove') { + debug_print('REMOVE', connection.id); + delete g_wants_requests[connection.id]; + } else { + debug_print('CHANGE', change); + } +}); + +ssb.registerRpc(['blobs', 'createWants'], function(request) { + g_wants_requests[request.connection.id] = request; + function blob_want_discovered(id) { + debug_print('discovered', id); + var message = {}; + message[id] = -1; + request.send_json(message); + } + ssb.registerBlobWantAdded(blob_want_discovered); + ssb.sqlStream( + 'SELECT id FROM blob_wants', + [], + row => blob_want_discovered(row.id)); +}); + +ssb.registerRpc(['blobs', 'has'], function(request) { + var found = false; + ssb.sqlStream( + 'SELECT 1 FROM blobs where id = ?1', + [request.args[0]], + function(row) { + found = true; + }); + request.send_json(found); +}); + +ssb.registerRpc(['blobs', 'get'], function(request) { + var blob = ssb.blobGet(request.args[0].id); + request.send_binary(blob); +}); + +ssb.registerRpc(['createHistoryStream'], function(request) { + var id = request.args[0].id; + var seq = request.args[0].seq; + ssb.sqlStream( + 'SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', + [id, seq ?? 0], + function(row) { + var message = { + 'previous': row.previous, + 'author': id, + 'sequence': row.sequence, + 'timestamp': row.timestamp, + 'hash': row.hash, + 'content': JSON.parse(row.content), + 'signature': row.signature, + }; + debug_print('sending1', JSON.stringify(message)); + request.send_json(message); + }); +}); diff --git a/src/ssb.c b/src/ssb.c index a3835bc3..b80a6c24 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -54,7 +54,7 @@ typedef enum { } tf_ssb_state_t; enum { - k_connections_changed_callbacks_max = 4, + k_connections_changed_callbacks_max = 8, k_tf_ssb_rpc_message_body_length_max = 8192, }; @@ -82,10 +82,19 @@ typedef struct _tf_ssb_rpc_callback_node_t tf_ssb_rpc_callback_node_t; typedef struct _tf_ssb_rpc_callback_node_t { const char** name; tf_ssb_rpc_callback_t* callback; + tf_ssb_rpc_cleanup_t* cleanup; void* user_data; tf_ssb_rpc_callback_node_t* next; } tf_ssb_rpc_callback_node_t; +typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t; +typedef struct _tf_ssb_blob_want_added_callback_node_t { + void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data); + void (*cleanup)(tf_ssb_t* ssb, void* user_data); + void* user_data; + tf_ssb_blob_want_added_callback_node_t* next; +} tf_ssb_blob_want_added_callback_node_t; + typedef struct _tf_ssb_t { bool own_context; JSRuntime* runtime; @@ -109,6 +118,7 @@ typedef struct _tf_ssb_t { uint8_t priv[crypto_sign_SECRETKEYBYTES]; tf_ssb_connections_changed_callback_t* connections_changed[k_connections_changed_callbacks_max]; + tf_ssb_rpc_cleanup_t* connections_changed_cleanup[k_connections_changed_callbacks_max]; void* connections_changed_user_data[k_connections_changed_callbacks_max]; int connections_changed_count; @@ -122,6 +132,8 @@ typedef struct _tf_ssb_t { tf_ssb_broadcast_t* broadcasts; tf_ssb_rpc_callback_node_t* rpc; + + tf_ssb_blob_want_added_callback_node_t* blob_want_added; } tf_ssb_t; typedef struct _tf_ssb_connection_t { @@ -129,6 +141,8 @@ typedef struct _tf_ssb_connection_t { uv_tcp_t tcp; uv_connect_t connect; + JSValue object; + char host[256]; int port; @@ -164,11 +178,14 @@ typedef struct _tf_ssb_connection_t { tf_ssb_request_t* requests; } tf_ssb_connection_t; +static JSClassID _connection_class_id; + static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream); static void _tf_ssb_connection_on_close(uv_handle_t* handle); static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_nonce_inc(uint8_t* nonce); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); +static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value); static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection) { @@ -362,6 +379,9 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next) { if ((*it)->request_number == request_number) { tf_ssb_request_t* found = *it; + if (found->user_data) { + JS_FreeValue(tf_ssb_connection_get_context(connection), JS_MKPTR(JS_TAG_OBJECT, found->user_data)); + } *it = found->next; free(found); break; @@ -821,7 +841,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t bool found = false; for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { if (_tf_ssb_name_equals(context, val, it->name)) { - it->callback(connection, flags, request_number, val, message, size, it->user_data); + it->callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, it->user_data); found = true; break; } @@ -831,7 +851,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t void* user_data = NULL; if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) { if (callback) { - callback(connection, flags, request_number, val, NULL, 0, user_data); + callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, user_data); } } else { const char* k_unsupported = "{\"message\": \"unsupported message\", \"name\": \"Error\", \"stack\": \"none\", \"args\": []}"; @@ -1011,6 +1031,14 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message) void tf_ssb_connection_destroy(tf_ssb_connection_t* connection) { + free(connection); +} + +static void _tf_ssb_connection_on_close(uv_handle_t* handle) +{ + tf_ssb_connection_t* connection = handle->data; + handle->data = NULL; + tf_ssb_t* ssb = connection->ssb; for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { if (*it == connection) { @@ -1025,15 +1053,7 @@ void tf_ssb_connection_destroy(tf_ssb_connection_t* connection) for (int i = 0; i < ssb->connections_changed_count; i++) { ssb->connections_changed[i](ssb, k_tf_ssb_change_remove, connection, ssb->connections_changed_user_data[i]); } - free(connection); -} - -static void _tf_ssb_connection_on_close(uv_handle_t* handle) -{ - printf("destroy connection\n"); - tf_ssb_connection_t* connection = handle->data; - handle->data = NULL; - tf_ssb_connection_destroy(connection); + JS_FreeValue(connection->ssb->context, connection->object); } static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) @@ -1286,6 +1306,14 @@ tf_ssb_t* tf_ssb_create(uv_loop_t* loop, JSContext* context, sqlite3* db, const ssb->context = JS_NewContext(ssb->runtime); } + JS_NewClassID(&_connection_class_id); + JSClassDef def = + { + .class_name = "connection", + .finalizer = _tf_ssb_connection_finalizer, + }; + JS_NewClass(JS_GetRuntime(ssb->context), _connection_class_id, &def); + if (db) { ssb->db = db; } else { @@ -1373,6 +1401,12 @@ void tf_ssb_destroy(tf_ssb_t* ssb) tf_ssb_connections_destroy(ssb->connections_tracker); ssb->connections_tracker = NULL; + for (int i = 0; i < ssb->connections_changed_count; i++) { + if (ssb->connections_changed_cleanup[i]) { + ssb->connections_changed_cleanup[i](ssb, ssb->connections_changed_user_data[i]); + } + } + tf_ssb_rpc_destroy(ssb->rpc_state); ssb->rpc_state = NULL; @@ -1402,6 +1436,23 @@ void tf_ssb_destroy(tf_ssb_t* ssb) if (ssb->loop == &ssb->own_loop) { uv_loop_close(ssb->loop); } + while (ssb->rpc) { + tf_ssb_rpc_callback_node_t* node = ssb->rpc; + ssb->rpc = node->next; + if (node->cleanup) { + node->cleanup(ssb, node->user_data); + node->cleanup = NULL; + } + free(node); + } + while (ssb->blob_want_added) { + tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; + ssb->blob_want_added = node->next; + if (node->cleanup) { + node->cleanup(ssb, node->user_data); + } + free(node); + } if (ssb->own_context) { JS_FreeContext(ssb->context); JS_FreeRuntime(ssb->runtime); @@ -1414,11 +1465,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb) ssb->broadcasts = broadcast->next; free(broadcast); } - while (ssb->rpc) { - tf_ssb_rpc_callback_node_t* node = ssb->rpc; - ssb->rpc = node->next; - free(node); - } free(ssb); } @@ -1427,8 +1473,50 @@ void tf_ssb_run(tf_ssb_t* ssb) uv_run(ssb->loop, UV_RUN_DEFAULT); } +static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value) +{ + tf_ssb_connection_t* connection = JS_GetOpaque(value, _connection_class_id); + tf_ssb_connection_destroy(connection); +} + +static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + if (!user_data) { + return; + } + + void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); + _tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data); +} + +static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id); + if (!connection) { + return JS_UNDEFINED; + } + + uint32_t request_number = tf_ssb_connection_next_request_number(connection); + + JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); + size_t size; + const char* message = JS_ToCStringLen(context, &size, message_val); + JS_FreeValue(context, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + request_number, + (const uint8_t*)message, + size, + _tf_ssb_connection_send_json_response, + JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL); + return JS_UNDEFINED; +} + tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) { + JSContext* context = ssb->context; tf_ssb_connection_t* connection = malloc(sizeof(tf_ssb_connection_t)); memset(connection, 0, sizeof(*connection)); connection->ssb = ssb; @@ -1438,6 +1526,15 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c snprintf(connection->host, sizeof(connection->host), "%s", host); connection->port = ntohs(addr->sin_port); + connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); + JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); + char public_key_str[k_id_base64_len] = { 0 }; + if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key)) + { + JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, public_key_str)); + } + JS_SetOpaque(connection->object, connection); + memcpy(connection->serverpub, public_key, sizeof(connection->serverpub)); uv_tcp_init(ssb->loop, &connection->tcp); @@ -1514,6 +1611,10 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) { connection->tcp.data = connection; connection->send_request_number = 1; + connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); + JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2)); + JS_SetOpaque(connection->object, connection); + if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) { printf("uv_tcp_init failed\n"); free(connection); @@ -1811,15 +1912,16 @@ void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_s ssb->broadcasts_changed_user_data = user_data; } -void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data) +void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) { assert(ssb->connections_changed_count < k_connections_changed_callbacks_max); ssb->connections_changed[ssb->connections_changed_count] = callback; + ssb->connections_changed_cleanup[ssb->connections_changed_count] = cleanup; ssb->connections_changed_user_data[ssb->connections_changed_count] = user_data; ssb->connections_changed_count++; } -void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data) +void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data) { size_t name_len = 0; int name_count = 0; @@ -1831,6 +1933,7 @@ void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t *node = (tf_ssb_rpc_callback_node_t) { .name = (const char**)(node + 1), .callback = callback, + .cleanup = cleanup, .user_data = user_data, .next = ssb->rpc, }; @@ -1864,3 +1967,34 @@ int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection) { return connection->send_request_number++; } + +JSClassID tf_ssb_get_connection_class_id() +{ + return _connection_class_id; +} + +JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) +{ + return connection ? connection->object : JS_UNDEFINED; +} + +void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) +{ + tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t)); + *node = (tf_ssb_blob_want_added_callback_node_t) + { + .callback = callback, + .cleanup = cleanup, + .user_data = user_data, + .next = ssb->blob_want_added, + }; + ssb->blob_want_added = node; +} + +void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id) +{ + for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = node->next) + { + node->callback(ssb, id, node->user_data); + } +} diff --git a/src/ssb.connections.c b/src/ssb.connections.c index 307c292a..f025265e 100644 --- a/src/ssb.connections.c +++ b/src/ssb.connections.c @@ -92,7 +92,7 @@ tf_ssb_connections_t* tf_ssb_connections_create(tf_ssb_t* ssb) connections->ssb = ssb; connections->db = tf_ssb_get_db(ssb); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, connections); + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, NULL, connections); uv_loop_t* loop = tf_ssb_get_loop(ssb); connections->timer.data = connections; diff --git a/src/ssb.db.c b/src/ssb.db.c index c874bdf8..f9095598 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -35,6 +35,11 @@ void tf_ssb_db_init(tf_ssb_t* ssb) " created INTEGER" ")", NULL, NULL, NULL); + sqlite3_exec(db, + "CREATE TABLE IF NOT EXISTS blob_wants (" + " id TEXT PRIMARY KEY" + ")", + NULL, NULL, NULL); sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS properties (" " id TEXT," @@ -75,6 +80,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, sqlite3* db = tf_ssb_get_db(ssb); sqlite3_stmt* statement; + int64_t last_row_id = -1; const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && @@ -90,12 +96,37 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, printf("%s\n", sqlite3_errmsg(db)); } stored = r == SQLITE_DONE && sqlite3_changes(db) != 0; + if (stored) + { + last_row_id = sqlite3_last_insert_rowid(db); + } } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } + if (last_row_id != -1) + { + query = "INSERT INTO blob_wants (id) SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL ON CONFLICT DO NOTHING RETURNING id"; + if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { + if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK && + sqlite3_bind_int(statement, 2, BLOB_ID_LEN - 1) == SQLITE_OK) { + int r = SQLITE_OK; + while ((r = sqlite3_step(statement)) == SQLITE_ROW) + { + tf_ssb_notify_blob_want_added(ssb, (const char*)sqlite3_column_text(statement, 0)); + } + if (r != SQLITE_DONE) { + printf("%s\n", sqlite3_errmsg(db)); + } + } + sqlite3_finalize(statement); + } else { + printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + } + JS_FreeValue(context, previousval); JS_FreeCString(context, author); JS_FreeValue(context, authorval); @@ -319,7 +350,10 @@ static int _tf_ssb_sqlite_authorizer(void* user_data, int action_code, const cha case SQLITE_FUNCTION: return SQLITE_OK; case SQLITE_READ: - return strcmp(arg0, "messages") == 0 ? SQLITE_OK : SQLITE_DENY; + return + (strcmp(arg0, "messages") == 0 || + strcmp(arg0, "blob_wants") == 0) + ? SQLITE_OK : SQLITE_DENY; break; } return SQLITE_DENY; diff --git a/src/ssb.h b/src/ssb.h index ab545fbf..5f9cc760 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -65,8 +65,9 @@ bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size); void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data); void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data); +typedef void (tf_ssb_rpc_cleanup_t)(tf_ssb_t* ssb, void* user_data); typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data); -void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, void* user_data); +void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data); const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb); int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count); void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key); @@ -84,7 +85,7 @@ bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin); void tf_ssb_test(); typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); -void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data); +void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data); bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size); void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size); @@ -101,3 +102,9 @@ int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); +JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection); + +void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data); +void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id); + +JSClassID tf_ssb_get_connection_class_id(); diff --git a/src/ssb.qjs.c b/src/ssb.qjs.c index b2fd49da..dff20221 100644 --- a/src/ssb.qjs.c +++ b/src/ssb.qjs.c @@ -5,6 +5,9 @@ #include "task.h" #include +#include +#include +#include #include #include "quickjs-libc.h" @@ -128,6 +131,33 @@ static JSValue _tf_ssb_createHistoryStream(JSContext* context, JSValueConst this return JS_NULL; } +static void _check_call(JSContext* context, JSValue result) +{ + if (JS_IsError(context, result)) + { + const char* value = JS_ToCString(context, result); + printf("ERROR: %s\n", value); + JS_FreeCString(context, value); + JSValue stack = JS_GetPropertyStr(context, result, "stack"); + if (!JS_IsUndefined(stack)) { + const char* stack_str = JS_ToCString(context, stack); + printf("%s\n", stack_str); + JS_FreeCString(context, stack_str); + } + JS_FreeValue(context, stack); + } + else if (JS_IsException(result)) + { + js_std_dump_error(context); + JSValue error = JS_GetException(context); + const char* value = JS_ToCString(context, error); + printf("Exception: %s\n", value); + JS_FreeCString(context, value); + JS_FreeValue(context, error); + abort(); + } +} + typedef struct _sqlStream_callback_t { JSContext* context; @@ -137,11 +167,11 @@ typedef struct _sqlStream_callback_t static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data) { sqlStream_callback_t* info = user_data; JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row); - if (JS_IsException(response)) { - printf("Error on SQL callback.\n"); - js_std_dump_error(info->context); + _check_call(info->context, response); + if (tf_task_get(info->context)) + { + tf_task_run_jobs(tf_task_get(info->context)); } - tf_task_run_jobs(tf_task_get(info->context)); JS_FreeValue(info->context, response); } @@ -184,6 +214,20 @@ static JSValue _tf_ssb_appendMessage(JSContext* context, JSValueConst this_val, return JS_NULL; } +static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + char signature[crypto_sign_BYTES + 128]; + char id[crypto_hash_sha256_BYTES * 2 + 1]; + tf_ssb_calculate_message_id(context, argv[0], id, sizeof(id)); + if (tf_ssb_verify_and_strip_signature(context, argv[0], signature, sizeof(signature))) { + tf_ssb_db_store_message(ssb, context, id, argv[0], signature); + } else { + printf("failed to verify message\n"); + } + return JS_UNDEFINED; +} + typedef struct _broadcasts_t { JSContext* context; @@ -259,14 +303,16 @@ static void _tf_ssb_call_callback(tf_ssb_t* ssb, const char* name, void* user_da JSValue global = JS_GetGlobalObject(context); JSValue ssbo = JS_GetPropertyStr(context, global, "ssb"); JSValue callback = JS_GetPropertyStr(context, ssbo, name); - JSValue args = JS_UNDEFINED; - JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args); - if (JS_IsException(response)) { - printf("Error on callback: %s.\n", name); - js_std_dump_error(context); + if (JS_IsFunction(context, callback)) { + JSValue args = JS_UNDEFINED; + JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args); + _check_call(context, response); + if (tf_task_get(context)) + { + tf_task_run_jobs(tf_task_get(context)); + } + JS_FreeValue(context, response); } - tf_task_run_jobs(tf_task_get(context)); - JS_FreeValue(context, response); JS_FreeValue(context, ssbo); JS_FreeValue(context, global); } @@ -281,6 +327,301 @@ static void _tf_ssb_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, t _tf_ssb_call_callback(ssb, "onConnectionsChanged", user_data); } +static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); + tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); + JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); + int32_t request_number; + JS_ToInt32(context, &request_number, request_val); + JS_FreeValue(context, request_val); + + JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL); + size_t size; + const char* message = JS_ToCStringLen(context, &size, message_val); + JS_FreeValue(context, message_val); + + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + -request_number, + (const uint8_t*)message, + size, + NULL, + NULL); + JS_FreeValue(context, connection_val); + return JS_UNDEFINED; +} + +static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); + tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); + JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); + int32_t request_number; + JS_ToInt32(context, &request_number, request_val); + JS_FreeValue(context, request_val); + + size_t size; + uint8_t* message = tf_try_get_array_buffer(context, &size, argv[0]); + if (message) { + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, + -request_number, + (const uint8_t*)message, + size, + NULL, + NULL); + } + JS_FreeValue(context, connection_val); + return JS_UNDEFINED; +} + +void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JSContext* context = tf_ssb_get_context(ssb); + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JSValue object = JS_NewObject(context); + printf("sending object = %d\n", JS_IsObject(tf_ssb_connection_get_object(connection))); + JSValue connection_object = JS_DupValue(context, tf_ssb_connection_get_object(connection)); + JS_SetPropertyStr(context, object, "connection", connection_object); + JS_SetPropertyStr(context, object, "flags", JS_NewUint32(context, flags)); + JS_SetPropertyStr(context, object, "request_number", JS_NewInt32(context, request_number)); + JS_SetPropertyStr(context, object, "args", JS_GetPropertyStr(context, args, "args")); + JS_SetPropertyStr(context, object, "message", message && size ? JS_NewArrayBufferCopy(context, message, size) : args); + JS_SetPropertyStr(context, object, "send_json", JS_NewCFunction(context, _tf_ssb_rpc_send_json, "send_json", 1)); + JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1)); + + JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object); + _check_call(context, result); + JS_FreeValue(context, result); + JS_FreeValue(context, object); +} + +static void _tf_ssb_rpc_js_value_cleanup(tf_ssb_t* ssb, void* user_data) +{ + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JS_FreeValue(tf_ssb_get_context(ssb), callback); +} + +static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + if (!JS_IsArray(context, argv[0])) + { + return JS_ThrowTypeError(context, "Expected argument 1 to be an array of strings."); + } + if (!JS_IsFunction(context, argv[1])) + { + return JS_ThrowTypeError(context, "Expected argument 2 to be a function."); + } + + JSValue length_val = JS_GetPropertyStr(context, argv[0], "length"); + int length = 0; + JS_ToInt32(context, &length, length_val); + + enum { k_max_name_parts = 16 }; + const char* name[k_max_name_parts + 1] = { 0 }; + + if (length >= k_max_name_parts) + { + return JS_ThrowInternalError(context, "Too many parts to RPC name."); + } + + for (int i = 0; i < length; i++) + { + JSValue value = JS_GetPropertyUint32(context, argv[0], i); + name[i] = JS_ToCString(context, value); + JS_FreeValue(context, value); + } + + tf_ssb_register_rpc(ssb, name, _tf_ssb_on_rpc, _tf_ssb_rpc_js_value_cleanup, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1]))); + + for (int i = 0; i < length; i++) + { + JS_FreeCString(context, name[i]); + } + + return JS_UNDEFINED; +} + +static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user_data) +{ + JSContext* context = tf_ssb_get_context(ssb); + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JSValue string = JS_NewString(context, id); + JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &string); + _check_call(context, response); + JS_FreeValue(context, response); + JS_FreeValue(context, string); +} + +static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) +{ + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + printf("CLEANUP %p\n", user_data); + JS_FreeValue(tf_ssb_get_context(ssb), callback); +} + +static JSValue _tf_ssb_register_blob_want_added(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + if (!JS_IsFunction(context, argv[0])) + { + return JS_ThrowTypeError(context, "Expected argument 1 to be a function."); + } + printf("registering %p\n", JS_VALUE_GET_PTR(argv[0])); + tf_ssb_register_blob_want_added(ssb, _tf_ssb_on_blob_want_added, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]))); + return JS_UNDEFINED; +} + +static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) +{ + JSContext* context = tf_ssb_get_context(ssb); + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JSValue response = JS_UNDEFINED; + switch (change) + { + case k_tf_ssb_change_create: + break; + case k_tf_ssb_change_connect: + { + JSValue object = /*JS_DupValue(context,*/ tf_ssb_connection_get_object(connection);//); + JSValue args[] = + { + JS_NewString(context, "add"), + object, + }; + printf("calling function for ptr %p IsFunction=%d\n", user_data, JS_IsFunction(context, callback)); + response = JS_Call(context, callback, JS_UNDEFINED, 2, args); + _check_call(context, response); + //JS_FreeValue(context, object); + } + break; + case k_tf_ssb_change_remove: + { + printf("CHANGE_REMOVE\n"); + JSValue object = /*JS_DupValue(context,*/ tf_ssb_connection_get_object(connection);//); + JSValue args[] = + { + JS_NewString(context, "remove"), + object, + }; + response = JS_Call(context, callback, JS_UNDEFINED, 2, args); + _check_call(context, response); + //JS_FreeValue(context, object); + } + break; + } + JS_FreeValue(context, response); +} + +static JSValue _tf_ssb_register_connections_changed(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + printf("register connections changed\n"); + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + if (!JS_IsFunction(context, argv[0])) + { + return JS_ThrowTypeError(context, "Expected argument 1 to be a function."); + } + void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])); + printf("registering %p TAG=%d\n", ptr, JS_VALUE_GET_TAG(argv[0])); + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed_callback, _tf_ssb_rpc_js_value_cleanup, ptr); + return JS_UNDEFINED; +} + +void tf_ssb_run_file(JSContext* context, const char* file_name) +{ + FILE* file = fopen(file_name, "rb"); + if (!file) + { + printf("Unable to open %s: %s.", file_name, strerror(errno)); + return; + } + + char* source = NULL; + fseek(file, 0, SEEK_END); + long file_size = ftell(file); + fseek(file, 0, SEEK_SET); + source = malloc(file_size + 1); + fread(source, 1, file_size, file); + source[file_size] = '\0'; + fclose(file); + + JSValue result = JS_Eval(context, source, file_size, file_name, 0); + if (JS_IsError(context, result)) + { + printf("Error running %s.\n", file_name); + const char* value = JS_ToCString(context, result); + printf("ERROR: %s\n", value); + JS_FreeCString(context, value); + JSValue stack = JS_GetPropertyStr(context, result, "stack"); + if (!JS_IsUndefined(stack)) { + const char* stack_str = JS_ToCString(context, stack); + printf("%s\n", stack_str); + JS_FreeCString(context, stack_str); + } + JS_FreeValue(context, stack); + } + else if (JS_IsException(result)) + { + printf("Exception running %s.\n", file_name); + JSValue error = JS_GetException(context); + const char* value = JS_ToCString(context, error); + printf("Exception: %s\n", value); + JS_FreeCString(context, value); + JS_FreeValue(context, error); + } + JS_FreeValue(context, result); + free(source); +} + +JSValue _print(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { + for (int i = 0; i < argc; ++i) { + if (JS_IsNull(argv[i])) { + printf(" null"); + } else { + const char* value = JS_ToCString(context, argv[i]); + printf(" %s", value); + JS_FreeCString(context, value); + } + } + printf("\n"); + return JS_NULL; +} + +static JSValue _utf8Decode(JSContext* context, uint8_t* data, size_t length) { + return JS_NewStringLen(context, (const char*)data, length); +} + +static JSValue _utf8_decode(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { + JSValue result = JS_NULL; + size_t length; + if (JS_IsString(argv[0])) { + result = JS_DupValue(context, argv[0]); + } else { + uint8_t* array = tf_try_get_array_buffer(context, &length, argv[0]); + if (array) { + result = _utf8Decode(context, array, length); + } else { + size_t offset; + size_t element_size; + JSValue buffer = tf_try_get_typed_array_buffer(context, argv[0], &offset, &length, &element_size); + size_t size; + if (!JS_IsException(buffer)) { + array = tf_try_get_array_buffer(context, &size, buffer); + if (array) { + result = _utf8Decode(context, array, size); + } + } + JS_FreeValue(context, buffer); + } + } + return result; +} + void tf_ssb_init(JSContext* context, tf_ssb_t* ssb) { JS_NewClassID(&_tf_ssb_classId); @@ -292,7 +633,7 @@ void tf_ssb_init(JSContext* context, tf_ssb_t* ssb) } tf_ssb_set_broadcasts_changed_callback(ssb, _tf_ssb_broadcasts_changed, NULL); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL); + tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL, NULL); JSValue global = JS_GetGlobalObject(context); JSValue object = JS_NewObjectClass(context, _tf_ssb_classId); @@ -308,7 +649,17 @@ void tf_ssb_init(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3)); JS_SetPropertyStr(context, object, "post", JS_NewCFunction(context, _tf_ssb_post, "post", 1)); JS_SetPropertyStr(context, object, "appendMessage", JS_NewCFunction(context, _tf_ssb_appendMessage, "appendMessage", 1)); + JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); + JS_SetPropertyStr(context, object, "registerRpc", JS_NewCFunction(context, _tf_ssb_register_rpc, "registerRpc", 2)); + JS_SetPropertyStr(context, object, "registerBlobWantAdded", JS_NewCFunction(context, _tf_ssb_register_blob_want_added, "registerBlobWantAdded", 1)); + JS_SetPropertyStr(context, object, "registerConnectionsChanged", JS_NewCFunction(context, _tf_ssb_register_connections_changed, "registerConnectionsChanged", 1)); + + JS_SetPropertyStr(context, global, "debug_print", JS_NewCFunction(context, _print, "debug_print", 2)); + JS_SetPropertyStr(context, global, "debug_utf8Decode", JS_NewCFunction(context, _utf8_decode, "debug_utf8Decode", 1)); + JS_FreeValue(context, global); + + tf_ssb_run_file(context, "core/ssb.js"); } diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index cfee6eee..cbf362b6 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -32,7 +32,7 @@ typedef struct _tf_ssb_rpc_t const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth); -static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +/*static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { JSContext* context = tf_ssb_connection_get_context(connection); sqlite3* db = tf_ssb_connection_get_db(connection); @@ -59,9 +59,9 @@ static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, k_ssb_rpc_flag_end_error; const char* result = have ? "true" : "false"; tf_ssb_connection_rpc_send(connection, send_flags, -request_number, (const uint8_t*)result, strlen(result), NULL, NULL); -} +}*/ -static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +/*static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); @@ -89,7 +89,7 @@ static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, JS_FreeValue(context, blob_id_value); } JS_FreeValue(context, blob_ids); -} +}*/ typedef struct _tf_ssb_connection_blobs_get_t { @@ -291,7 +291,7 @@ static void _tf_ssb_blob_wants_update(tf_ssb_blob_wants_t* wants) } } -static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +/*static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_rpc_t* rpc = user_data; tf_ssb_blob_wants_t* wants = malloc(sizeof(tf_ssb_blob_wants_t)); @@ -304,9 +304,9 @@ static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8 }; rpc->wants = wants; _tf_ssb_blob_wants_update(wants); -} +}*/ -static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +/*static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { JSContext* context = tf_ssb_connection_get_context(connection); sqlite3* db = tf_ssb_connection_get_db(connection); @@ -358,7 +358,7 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin JS_FreeCString(context, author); JS_FreeValue(context, idval); JS_FreeValue(context, streamArgs); -} +}*/ static void _tf_ssb_connection_on_rpc_createHistoryStream_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue val, const uint8_t* message, size_t size, void* user_data) { @@ -653,23 +653,27 @@ tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb) *rpc = (tf_ssb_rpc_t) { .wants_async.data = rpc, }; - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); - tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); - tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); - tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); - tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); - tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); - uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); - uv_unref((uv_handle_t*)&rpc->wants_async); + //tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); + //tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); + (void)_tf_ssb_rpc_connections_changed_callback; + (void)_tf_ssb_rpc_on_connections_changed; + //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); + //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); + //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); + //tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); + //uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); + (void)_tf_ssb_rpc_wants_async; + //uv_unref((uv_handle_t*)&rpc->wants_async); return rpc; } -static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle) +/*static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle) { free(handle->data); -} +}*/ void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc) { - uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed); + //uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed); + free(rpc); } diff --git a/src/ssb.tests.c b/src/ssb.tests.c index f16d5652..1f23fba1 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -1,6 +1,7 @@ #include "ssb.h" #include "ssb.db.h" +#include "ssb.qjs.h" #include #include @@ -76,15 +77,17 @@ static void _tf_ssb_test_ssb() uv_loop_init(&loop); tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, db0, NULL); + tf_ssb_init(tf_ssb_get_context(ssb0), ssb0); tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1, NULL); + tf_ssb_init(tf_ssb_get_context(ssb1), ssb1); test_t test = { .ssb0 = ssb0, .ssb1 = ssb1, }; - tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, &test); - tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, &test); + tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test); + tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test); tf_ssb_generate_keys(ssb0); tf_ssb_generate_keys(ssb1); diff --git a/src/tests.c b/src/tests.c index 8ae9006f..4e3b44d7 100644 --- a/src/tests.c +++ b/src/tests.c @@ -433,7 +433,7 @@ static void _test_socket(const char* exe_path) " print('connected', s.isConnected);\n" " print(s.peerName);\n" " s.read(function(data) {\n" - " print('read', data.length);\n" + " print('read', data ? data.length : null);\n" " });\n" " s.write('GET / HTTP/1.0\\r\\n\\r\\n');\n" "}).then(function() {\n" @@ -452,7 +452,7 @@ static void _test_socket(const char* exe_path) "s2.connect('www.unprompted.com', 443).then(function() {\n" " print('connected');\n" " s2.read(function(data) {\n" - " print('read', data.length);\n" + " print('read', data ? data.length : null);\n" " });\n" " return s2.startTls();\n" "}).then(function() {\n"