Work in progress moving SSB RPC handlers into javascript.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3657 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2021-09-06 17:50:38 +00:00
parent cfd5341a6b
commit cadcb236ee
9 changed files with 676 additions and 58 deletions

85
core/ssb.js Normal file
View File

@ -0,0 +1,85 @@
var g_wants_requests = {};
ssb.registerConnectionsChanged(function(change, connection) {
if (change == 'add') {
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': 0}]}, function(message) {
ssb.storeMessage(message.message);
});
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) {
var blob = ssb.blobGet(id);
if (blob) {
var out_message = {};
out_message[id] = blob.byteLength;
g_wants_requests[connection.id].send_json(out_message);
//connection.wants_request.send_json(out_message);
}
} else {
debug_print("blobs.get", id);
connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [{'id': id}]}, function(message) {
debug_print(id, '=>', debug_utf8Decode(message.message));
ssb.blobStore(message.message);
});
}
});
});
} else if (change == 'remove') {
debug_print('REMOVE', connection.id);
delete g_wants_requests[connection.id];
} else {
debug_print('CHANGE', change);
}
});
ssb.registerRpc(['blobs', 'createWants'], function(request) {
g_wants_requests[request.connection.id] = request;
function blob_want_discovered(id) {
debug_print('discovered', id);
var message = {};
message[id] = -1;
request.send_json(message);
}
ssb.registerBlobWantAdded(blob_want_discovered);
ssb.sqlStream(
'SELECT id FROM blob_wants',
[],
row => blob_want_discovered(row.id));
});
ssb.registerRpc(['blobs', 'has'], function(request) {
var found = false;
ssb.sqlStream(
'SELECT 1 FROM blobs where id = ?1',
[request.args[0]],
function(row) {
found = true;
});
request.send_json(found);
});
ssb.registerRpc(['blobs', 'get'], function(request) {
var blob = ssb.blobGet(request.args[0].id);
request.send_binary(blob);
});
ssb.registerRpc(['createHistoryStream'], function(request) {
var id = request.args[0].id;
var seq = request.args[0].seq;
ssb.sqlStream(
'SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, seq ?? 0],
function(row) {
var message = {
'previous': row.previous,
'author': id,
'sequence': row.sequence,
'timestamp': row.timestamp,
'hash': row.hash,
'content': JSON.parse(row.content),
'signature': row.signature,
};
debug_print('sending1', JSON.stringify(message));
request.send_json(message);
});
});

172
src/ssb.c
View File

@ -54,7 +54,7 @@ typedef enum {
} tf_ssb_state_t; } tf_ssb_state_t;
enum { enum {
k_connections_changed_callbacks_max = 4, k_connections_changed_callbacks_max = 8,
k_tf_ssb_rpc_message_body_length_max = 8192, k_tf_ssb_rpc_message_body_length_max = 8192,
}; };
@ -82,10 +82,19 @@ typedef struct _tf_ssb_rpc_callback_node_t tf_ssb_rpc_callback_node_t;
typedef struct _tf_ssb_rpc_callback_node_t { typedef struct _tf_ssb_rpc_callback_node_t {
const char** name; const char** name;
tf_ssb_rpc_callback_t* callback; tf_ssb_rpc_callback_t* callback;
tf_ssb_rpc_cleanup_t* cleanup;
void* user_data; void* user_data;
tf_ssb_rpc_callback_node_t* next; tf_ssb_rpc_callback_node_t* next;
} tf_ssb_rpc_callback_node_t; } tf_ssb_rpc_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t;
typedef struct _tf_ssb_blob_want_added_callback_node_t {
void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data);
void (*cleanup)(tf_ssb_t* ssb, void* user_data);
void* user_data;
tf_ssb_blob_want_added_callback_node_t* next;
} tf_ssb_blob_want_added_callback_node_t;
typedef struct _tf_ssb_t { typedef struct _tf_ssb_t {
bool own_context; bool own_context;
JSRuntime* runtime; JSRuntime* runtime;
@ -109,6 +118,7 @@ typedef struct _tf_ssb_t {
uint8_t priv[crypto_sign_SECRETKEYBYTES]; uint8_t priv[crypto_sign_SECRETKEYBYTES];
tf_ssb_connections_changed_callback_t* connections_changed[k_connections_changed_callbacks_max]; tf_ssb_connections_changed_callback_t* connections_changed[k_connections_changed_callbacks_max];
tf_ssb_rpc_cleanup_t* connections_changed_cleanup[k_connections_changed_callbacks_max];
void* connections_changed_user_data[k_connections_changed_callbacks_max]; void* connections_changed_user_data[k_connections_changed_callbacks_max];
int connections_changed_count; int connections_changed_count;
@ -122,6 +132,8 @@ typedef struct _tf_ssb_t {
tf_ssb_broadcast_t* broadcasts; tf_ssb_broadcast_t* broadcasts;
tf_ssb_rpc_callback_node_t* rpc; tf_ssb_rpc_callback_node_t* rpc;
tf_ssb_blob_want_added_callback_node_t* blob_want_added;
} tf_ssb_t; } tf_ssb_t;
typedef struct _tf_ssb_connection_t { typedef struct _tf_ssb_connection_t {
@ -129,6 +141,8 @@ typedef struct _tf_ssb_connection_t {
uv_tcp_t tcp; uv_tcp_t tcp;
uv_connect_t connect; uv_connect_t connect;
JSValue object;
char host[256]; char host[256];
int port; int port;
@ -164,11 +178,14 @@ typedef struct _tf_ssb_connection_t {
tf_ssb_request_t* requests; tf_ssb_request_t* requests;
} tf_ssb_connection_t; } tf_ssb_connection_t;
static JSClassID _connection_class_id;
static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream); static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream);
static void _tf_ssb_connection_on_close(uv_handle_t* handle); static void _tf_ssb_connection_on_close(uv_handle_t* handle);
static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_nonce_inc(uint8_t* nonce); static void _tf_ssb_nonce_inc(uint8_t* nonce);
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size);
static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value);
static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection) static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection)
{ {
@ -362,6 +379,9 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r
for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next) { for (tf_ssb_request_t** it = &connection->requests; *it; it = &(*it)->next) {
if ((*it)->request_number == request_number) { if ((*it)->request_number == request_number) {
tf_ssb_request_t* found = *it; tf_ssb_request_t* found = *it;
if (found->user_data) {
JS_FreeValue(tf_ssb_connection_get_context(connection), JS_MKPTR(JS_TAG_OBJECT, found->user_data));
}
*it = found->next; *it = found->next;
free(found); free(found);
break; break;
@ -821,7 +841,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
bool found = false; bool found = false;
for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) {
if (_tf_ssb_name_equals(context, val, it->name)) { if (_tf_ssb_name_equals(context, val, it->name)) {
it->callback(connection, flags, request_number, val, message, size, it->user_data); it->callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, it->user_data);
found = true; found = true;
break; break;
} }
@ -831,7 +851,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
void* user_data = NULL; void* user_data = NULL;
if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) { if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) {
if (callback) { if (callback) {
callback(connection, flags, request_number, val, NULL, 0, user_data); callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, user_data);
} }
} else { } else {
const char* k_unsupported = "{\"message\": \"unsupported message\", \"name\": \"Error\", \"stack\": \"none\", \"args\": []}"; const char* k_unsupported = "{\"message\": \"unsupported message\", \"name\": \"Error\", \"stack\": \"none\", \"args\": []}";
@ -1011,6 +1031,14 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message)
void tf_ssb_connection_destroy(tf_ssb_connection_t* connection) void tf_ssb_connection_destroy(tf_ssb_connection_t* connection)
{ {
free(connection);
}
static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{
tf_ssb_connection_t* connection = handle->data;
handle->data = NULL;
tf_ssb_t* ssb = connection->ssb; tf_ssb_t* ssb = connection->ssb;
for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) {
if (*it == connection) { if (*it == connection) {
@ -1025,15 +1053,7 @@ void tf_ssb_connection_destroy(tf_ssb_connection_t* connection)
for (int i = 0; i < ssb->connections_changed_count; i++) { for (int i = 0; i < ssb->connections_changed_count; i++) {
ssb->connections_changed[i](ssb, k_tf_ssb_change_remove, connection, ssb->connections_changed_user_data[i]); ssb->connections_changed[i](ssb, k_tf_ssb_change_remove, connection, ssb->connections_changed_user_data[i]);
} }
free(connection); JS_FreeValue(connection->ssb->context, connection->object);
}
static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{
printf("destroy connection\n");
tf_ssb_connection_t* connection = handle->data;
handle->data = NULL;
tf_ssb_connection_destroy(connection);
} }
static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
@ -1286,6 +1306,14 @@ tf_ssb_t* tf_ssb_create(uv_loop_t* loop, JSContext* context, sqlite3* db, const
ssb->context = JS_NewContext(ssb->runtime); ssb->context = JS_NewContext(ssb->runtime);
} }
JS_NewClassID(&_connection_class_id);
JSClassDef def =
{
.class_name = "connection",
.finalizer = _tf_ssb_connection_finalizer,
};
JS_NewClass(JS_GetRuntime(ssb->context), _connection_class_id, &def);
if (db) { if (db) {
ssb->db = db; ssb->db = db;
} else { } else {
@ -1373,6 +1401,12 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
tf_ssb_connections_destroy(ssb->connections_tracker); tf_ssb_connections_destroy(ssb->connections_tracker);
ssb->connections_tracker = NULL; ssb->connections_tracker = NULL;
for (int i = 0; i < ssb->connections_changed_count; i++) {
if (ssb->connections_changed_cleanup[i]) {
ssb->connections_changed_cleanup[i](ssb, ssb->connections_changed_user_data[i]);
}
}
tf_ssb_rpc_destroy(ssb->rpc_state); tf_ssb_rpc_destroy(ssb->rpc_state);
ssb->rpc_state = NULL; ssb->rpc_state = NULL;
@ -1402,6 +1436,23 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
if (ssb->loop == &ssb->own_loop) { if (ssb->loop == &ssb->own_loop) {
uv_loop_close(ssb->loop); uv_loop_close(ssb->loop);
} }
while (ssb->rpc) {
tf_ssb_rpc_callback_node_t* node = ssb->rpc;
ssb->rpc = node->next;
if (node->cleanup) {
node->cleanup(ssb, node->user_data);
node->cleanup = NULL;
}
free(node);
}
while (ssb->blob_want_added) {
tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added;
ssb->blob_want_added = node->next;
if (node->cleanup) {
node->cleanup(ssb, node->user_data);
}
free(node);
}
if (ssb->own_context) { if (ssb->own_context) {
JS_FreeContext(ssb->context); JS_FreeContext(ssb->context);
JS_FreeRuntime(ssb->runtime); JS_FreeRuntime(ssb->runtime);
@ -1414,11 +1465,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
ssb->broadcasts = broadcast->next; ssb->broadcasts = broadcast->next;
free(broadcast); free(broadcast);
} }
while (ssb->rpc) {
tf_ssb_rpc_callback_node_t* node = ssb->rpc;
ssb->rpc = node->next;
free(node);
}
free(ssb); free(ssb);
} }
@ -1427,8 +1473,50 @@ void tf_ssb_run(tf_ssb_t* ssb)
uv_run(ssb->loop, UV_RUN_DEFAULT); uv_run(ssb->loop, UV_RUN_DEFAULT);
} }
static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value)
{
tf_ssb_connection_t* connection = JS_GetOpaque(value, _connection_class_id);
tf_ssb_connection_destroy(connection);
}
static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
if (!user_data) {
return;
}
void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);
_tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data);
}
static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id);
if (!connection) {
return JS_UNDEFINED;
}
uint32_t request_number = tf_ssb_connection_next_request_number(connection);
JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL);
size_t size;
const char* message = JS_ToCStringLen(context, &size, message_val);
JS_FreeValue(context, message_val);
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream,
request_number,
(const uint8_t*)message,
size,
_tf_ssb_connection_send_json_response,
JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL);
return JS_UNDEFINED;
}
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)
{ {
JSContext* context = ssb->context;
tf_ssb_connection_t* connection = malloc(sizeof(tf_ssb_connection_t)); tf_ssb_connection_t* connection = malloc(sizeof(tf_ssb_connection_t));
memset(connection, 0, sizeof(*connection)); memset(connection, 0, sizeof(*connection));
connection->ssb = ssb; connection->ssb = ssb;
@ -1438,6 +1526,15 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
snprintf(connection->host, sizeof(connection->host), "%s", host); snprintf(connection->host, sizeof(connection->host), "%s", host);
connection->port = ntohs(addr->sin_port); connection->port = ntohs(addr->sin_port);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
char public_key_str[k_id_base64_len] = { 0 };
if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key))
{
JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, public_key_str));
}
JS_SetOpaque(connection->object, connection);
memcpy(connection->serverpub, public_key, sizeof(connection->serverpub)); memcpy(connection->serverpub, public_key, sizeof(connection->serverpub));
uv_tcp_init(ssb->loop, &connection->tcp); uv_tcp_init(ssb->loop, &connection->tcp);
@ -1514,6 +1611,10 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) {
connection->tcp.data = connection; connection->tcp.data = connection;
connection->send_request_number = 1; connection->send_request_number = 1;
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetOpaque(connection->object, connection);
if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) { if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) {
printf("uv_tcp_init failed\n"); printf("uv_tcp_init failed\n");
free(connection); free(connection);
@ -1811,15 +1912,16 @@ void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_s
ssb->broadcasts_changed_user_data = user_data; ssb->broadcasts_changed_user_data = user_data;
} }
void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data) void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data)
{ {
assert(ssb->connections_changed_count < k_connections_changed_callbacks_max); assert(ssb->connections_changed_count < k_connections_changed_callbacks_max);
ssb->connections_changed[ssb->connections_changed_count] = callback; ssb->connections_changed[ssb->connections_changed_count] = callback;
ssb->connections_changed_cleanup[ssb->connections_changed_count] = cleanup;
ssb->connections_changed_user_data[ssb->connections_changed_count] = user_data; ssb->connections_changed_user_data[ssb->connections_changed_count] = user_data;
ssb->connections_changed_count++; ssb->connections_changed_count++;
} }
void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data) void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data)
{ {
size_t name_len = 0; size_t name_len = 0;
int name_count = 0; int name_count = 0;
@ -1831,6 +1933,7 @@ void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t
*node = (tf_ssb_rpc_callback_node_t) { *node = (tf_ssb_rpc_callback_node_t) {
.name = (const char**)(node + 1), .name = (const char**)(node + 1),
.callback = callback, .callback = callback,
.cleanup = cleanup,
.user_data = user_data, .user_data = user_data,
.next = ssb->rpc, .next = ssb->rpc,
}; };
@ -1864,3 +1967,34 @@ int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection)
{ {
return connection->send_request_number++; return connection->send_request_number++;
} }
JSClassID tf_ssb_get_connection_class_id()
{
return _connection_class_id;
}
JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection)
{
return connection ? connection->object : JS_UNDEFINED;
}
void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
{
tf_ssb_blob_want_added_callback_node_t* node = malloc(sizeof(tf_ssb_blob_want_added_callback_node_t));
*node = (tf_ssb_blob_want_added_callback_node_t)
{
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.next = ssb->blob_want_added,
};
ssb->blob_want_added = node;
}
void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id)
{
for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = node->next)
{
node->callback(ssb, id, node->user_data);
}
}

View File

@ -92,7 +92,7 @@ tf_ssb_connections_t* tf_ssb_connections_create(tf_ssb_t* ssb)
connections->ssb = ssb; connections->ssb = ssb;
connections->db = tf_ssb_get_db(ssb); connections->db = tf_ssb_get_db(ssb);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, connections); tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, NULL, connections);
uv_loop_t* loop = tf_ssb_get_loop(ssb); uv_loop_t* loop = tf_ssb_get_loop(ssb);
connections->timer.data = connections; connections->timer.data = connections;

View File

@ -35,6 +35,11 @@ void tf_ssb_db_init(tf_ssb_t* ssb)
" created INTEGER" " created INTEGER"
")", ")",
NULL, NULL, NULL); NULL, NULL, NULL);
sqlite3_exec(db,
"CREATE TABLE IF NOT EXISTS blob_wants ("
" id TEXT PRIMARY KEY"
")",
NULL, NULL, NULL);
sqlite3_exec(db, sqlite3_exec(db,
"CREATE TABLE IF NOT EXISTS properties (" "CREATE TABLE IF NOT EXISTS properties ("
" id TEXT," " id TEXT,"
@ -75,6 +80,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
sqlite3* db = tf_ssb_get_db(ssb); sqlite3* db = tf_ssb_get_db(ssb);
sqlite3_stmt* statement; sqlite3_stmt* statement;
int64_t last_row_id = -1;
const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING"; const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) {
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
@ -90,12 +96,37 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
printf("%s\n", sqlite3_errmsg(db)); printf("%s\n", sqlite3_errmsg(db));
} }
stored = r == SQLITE_DONE && sqlite3_changes(db) != 0; stored = r == SQLITE_DONE && sqlite3_changes(db) != 0;
if (stored)
{
last_row_id = sqlite3_last_insert_rowid(db);
}
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);
} else { } else {
printf("prepare failed: %s\n", sqlite3_errmsg(db)); printf("prepare failed: %s\n", sqlite3_errmsg(db));
} }
if (last_row_id != -1)
{
query = "INSERT INTO blob_wants (id) SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL ON CONFLICT DO NOTHING RETURNING id";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) {
if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, BLOB_ID_LEN - 1) == SQLITE_OK) {
int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
{
tf_ssb_notify_blob_want_added(ssb, (const char*)sqlite3_column_text(statement, 0));
}
if (r != SQLITE_DONE) {
printf("%s\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
} else {
printf("prepare failed: %s\n", sqlite3_errmsg(db));
}
}
JS_FreeValue(context, previousval); JS_FreeValue(context, previousval);
JS_FreeCString(context, author); JS_FreeCString(context, author);
JS_FreeValue(context, authorval); JS_FreeValue(context, authorval);
@ -319,7 +350,10 @@ static int _tf_ssb_sqlite_authorizer(void* user_data, int action_code, const cha
case SQLITE_FUNCTION: case SQLITE_FUNCTION:
return SQLITE_OK; return SQLITE_OK;
case SQLITE_READ: case SQLITE_READ:
return strcmp(arg0, "messages") == 0 ? SQLITE_OK : SQLITE_DENY; return
(strcmp(arg0, "messages") == 0 ||
strcmp(arg0, "blob_wants") == 0)
? SQLITE_OK : SQLITE_DENY;
break; break;
} }
return SQLITE_DENY; return SQLITE_DENY;

View File

@ -65,8 +65,9 @@ bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size);
void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data); void tf_ssb_set_broadcasts_changed_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data);
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data); void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data);
typedef void (tf_ssb_rpc_cleanup_t)(tf_ssb_t* ssb, void* user_data);
typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data); typedef void (tf_ssb_connections_changed_callback_t)(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data);
void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, void* user_data); void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data);
const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb); const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb);
int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count); int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count);
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key); void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key);
@ -84,7 +85,7 @@ bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin);
void tf_ssb_test(); void tf_ssb_test();
typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);
void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data); void tf_ssb_register_rpc(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_rpc_cleanup_t* cleanup, void* user_data);
bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size); bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_signature, size_t out_signature_size);
void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size); void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size);
@ -101,3 +102,9 @@ int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection);
bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number);
JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection);
void tf_ssb_register_blob_want_added(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data);
void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id);
JSClassID tf_ssb_get_connection_class_id();

View File

@ -5,6 +5,9 @@
#include "task.h" #include "task.h"
#include <malloc.h> #include <malloc.h>
#include <sodium/crypto_hash_sha256.h>
#include <sodium/crypto_sign.h>
#include <string.h>
#include <uv.h> #include <uv.h>
#include "quickjs-libc.h" #include "quickjs-libc.h"
@ -128,6 +131,33 @@ static JSValue _tf_ssb_createHistoryStream(JSContext* context, JSValueConst this
return JS_NULL; return JS_NULL;
} }
static void _check_call(JSContext* context, JSValue result)
{
if (JS_IsError(context, result))
{
const char* value = JS_ToCString(context, result);
printf("ERROR: %s\n", value);
JS_FreeCString(context, value);
JSValue stack = JS_GetPropertyStr(context, result, "stack");
if (!JS_IsUndefined(stack)) {
const char* stack_str = JS_ToCString(context, stack);
printf("%s\n", stack_str);
JS_FreeCString(context, stack_str);
}
JS_FreeValue(context, stack);
}
else if (JS_IsException(result))
{
js_std_dump_error(context);
JSValue error = JS_GetException(context);
const char* value = JS_ToCString(context, error);
printf("Exception: %s\n", value);
JS_FreeCString(context, value);
JS_FreeValue(context, error);
abort();
}
}
typedef struct _sqlStream_callback_t typedef struct _sqlStream_callback_t
{ {
JSContext* context; JSContext* context;
@ -137,11 +167,11 @@ typedef struct _sqlStream_callback_t
static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data) { static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data) {
sqlStream_callback_t* info = user_data; sqlStream_callback_t* info = user_data;
JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row); JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row);
if (JS_IsException(response)) { _check_call(info->context, response);
printf("Error on SQL callback.\n"); if (tf_task_get(info->context))
js_std_dump_error(info->context); {
tf_task_run_jobs(tf_task_get(info->context));
} }
tf_task_run_jobs(tf_task_get(info->context));
JS_FreeValue(info->context, response); JS_FreeValue(info->context, response);
} }
@ -184,6 +214,20 @@ static JSValue _tf_ssb_appendMessage(JSContext* context, JSValueConst this_val,
return JS_NULL; return JS_NULL;
} }
static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
char signature[crypto_sign_BYTES + 128];
char id[crypto_hash_sha256_BYTES * 2 + 1];
tf_ssb_calculate_message_id(context, argv[0], id, sizeof(id));
if (tf_ssb_verify_and_strip_signature(context, argv[0], signature, sizeof(signature))) {
tf_ssb_db_store_message(ssb, context, id, argv[0], signature);
} else {
printf("failed to verify message\n");
}
return JS_UNDEFINED;
}
typedef struct _broadcasts_t typedef struct _broadcasts_t
{ {
JSContext* context; JSContext* context;
@ -259,14 +303,16 @@ static void _tf_ssb_call_callback(tf_ssb_t* ssb, const char* name, void* user_da
JSValue global = JS_GetGlobalObject(context); JSValue global = JS_GetGlobalObject(context);
JSValue ssbo = JS_GetPropertyStr(context, global, "ssb"); JSValue ssbo = JS_GetPropertyStr(context, global, "ssb");
JSValue callback = JS_GetPropertyStr(context, ssbo, name); JSValue callback = JS_GetPropertyStr(context, ssbo, name);
JSValue args = JS_UNDEFINED; if (JS_IsFunction(context, callback)) {
JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args); JSValue args = JS_UNDEFINED;
if (JS_IsException(response)) { JSValue response = JS_Call(context, callback, JS_UNDEFINED, 0, &args);
printf("Error on callback: %s.\n", name); _check_call(context, response);
js_std_dump_error(context); if (tf_task_get(context))
{
tf_task_run_jobs(tf_task_get(context));
}
JS_FreeValue(context, response);
} }
tf_task_run_jobs(tf_task_get(context));
JS_FreeValue(context, response);
JS_FreeValue(context, ssbo); JS_FreeValue(context, ssbo);
JS_FreeValue(context, global); JS_FreeValue(context, global);
} }
@ -281,6 +327,301 @@ static void _tf_ssb_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, t
_tf_ssb_call_callback(ssb, "onConnectionsChanged", user_data); _tf_ssb_call_callback(ssb, "onConnectionsChanged", user_data);
} }
static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL);
size_t size;
const char* message = JS_ToCStringLen(context, &size, message_val);
JS_FreeValue(context, message_val);
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream,
-request_number,
(const uint8_t*)message,
size,
NULL,
NULL);
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
size_t size;
uint8_t* message = tf_try_get_array_buffer(context, &size, argv[0]);
if (message) {
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream,
-request_number,
(const uint8_t*)message,
size,
NULL,
NULL);
}
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JSValue object = JS_NewObject(context);
printf("sending object = %d\n", JS_IsObject(tf_ssb_connection_get_object(connection)));
JSValue connection_object = JS_DupValue(context, tf_ssb_connection_get_object(connection));
JS_SetPropertyStr(context, object, "connection", connection_object);
JS_SetPropertyStr(context, object, "flags", JS_NewUint32(context, flags));
JS_SetPropertyStr(context, object, "request_number", JS_NewInt32(context, request_number));
JS_SetPropertyStr(context, object, "args", JS_GetPropertyStr(context, args, "args"));
JS_SetPropertyStr(context, object, "message", message && size ? JS_NewArrayBufferCopy(context, message, size) : args);
JS_SetPropertyStr(context, object, "send_json", JS_NewCFunction(context, _tf_ssb_rpc_send_json, "send_json", 1));
JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1));
JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object);
_check_call(context, result);
JS_FreeValue(context, result);
JS_FreeValue(context, object);
}
static void _tf_ssb_rpc_js_value_cleanup(tf_ssb_t* ssb, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_register_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsArray(context, argv[0]))
{
return JS_ThrowTypeError(context, "Expected argument 1 to be an array of strings.");
}
if (!JS_IsFunction(context, argv[1]))
{
return JS_ThrowTypeError(context, "Expected argument 2 to be a function.");
}
JSValue length_val = JS_GetPropertyStr(context, argv[0], "length");
int length = 0;
JS_ToInt32(context, &length, length_val);
enum { k_max_name_parts = 16 };
const char* name[k_max_name_parts + 1] = { 0 };
if (length >= k_max_name_parts)
{
return JS_ThrowInternalError(context, "Too many parts to RPC name.");
}
for (int i = 0; i < length; i++)
{
JSValue value = JS_GetPropertyUint32(context, argv[0], i);
name[i] = JS_ToCString(context, value);
JS_FreeValue(context, value);
}
tf_ssb_register_rpc(ssb, name, _tf_ssb_on_rpc, _tf_ssb_rpc_js_value_cleanup, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])));
for (int i = 0; i < length; i++)
{
JS_FreeCString(context, name[i]);
}
return JS_UNDEFINED;
}
static void _tf_ssb_on_blob_want_added(tf_ssb_t* ssb, const char* id, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JSValue string = JS_NewString(context, id);
JSValue response = JS_Call(context, callback, JS_UNDEFINED, 1, &string);
_check_call(context, response);
JS_FreeValue(context, response);
JS_FreeValue(context, string);
}
static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
printf("CLEANUP %p\n", user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_register_blob_want_added(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsFunction(context, argv[0]))
{
return JS_ThrowTypeError(context, "Expected argument 1 to be a function.");
}
printf("registering %p\n", JS_VALUE_GET_PTR(argv[0]));
tf_ssb_register_blob_want_added(ssb, _tf_ssb_on_blob_want_added, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])));
return JS_UNDEFINED;
}
static void _tf_ssb_rpc_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JSValue response = JS_UNDEFINED;
switch (change)
{
case k_tf_ssb_change_create:
break;
case k_tf_ssb_change_connect:
{
JSValue object = /*JS_DupValue(context,*/ tf_ssb_connection_get_object(connection);//);
JSValue args[] =
{
JS_NewString(context, "add"),
object,
};
printf("calling function for ptr %p IsFunction=%d\n", user_data, JS_IsFunction(context, callback));
response = JS_Call(context, callback, JS_UNDEFINED, 2, args);
_check_call(context, response);
//JS_FreeValue(context, object);
}
break;
case k_tf_ssb_change_remove:
{
printf("CHANGE_REMOVE\n");
JSValue object = /*JS_DupValue(context,*/ tf_ssb_connection_get_object(connection);//);
JSValue args[] =
{
JS_NewString(context, "remove"),
object,
};
response = JS_Call(context, callback, JS_UNDEFINED, 2, args);
_check_call(context, response);
//JS_FreeValue(context, object);
}
break;
}
JS_FreeValue(context, response);
}
static JSValue _tf_ssb_register_connections_changed(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
printf("register connections changed\n");
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsFunction(context, argv[0]))
{
return JS_ThrowTypeError(context, "Expected argument 1 to be a function.");
}
void* ptr = JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]));
printf("registering %p TAG=%d\n", ptr, JS_VALUE_GET_TAG(argv[0]));
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed_callback, _tf_ssb_rpc_js_value_cleanup, ptr);
return JS_UNDEFINED;
}
void tf_ssb_run_file(JSContext* context, const char* file_name)
{
FILE* file = fopen(file_name, "rb");
if (!file)
{
printf("Unable to open %s: %s.", file_name, strerror(errno));
return;
}
char* source = NULL;
fseek(file, 0, SEEK_END);
long file_size = ftell(file);
fseek(file, 0, SEEK_SET);
source = malloc(file_size + 1);
fread(source, 1, file_size, file);
source[file_size] = '\0';
fclose(file);
JSValue result = JS_Eval(context, source, file_size, file_name, 0);
if (JS_IsError(context, result))
{
printf("Error running %s.\n", file_name);
const char* value = JS_ToCString(context, result);
printf("ERROR: %s\n", value);
JS_FreeCString(context, value);
JSValue stack = JS_GetPropertyStr(context, result, "stack");
if (!JS_IsUndefined(stack)) {
const char* stack_str = JS_ToCString(context, stack);
printf("%s\n", stack_str);
JS_FreeCString(context, stack_str);
}
JS_FreeValue(context, stack);
}
else if (JS_IsException(result))
{
printf("Exception running %s.\n", file_name);
JSValue error = JS_GetException(context);
const char* value = JS_ToCString(context, error);
printf("Exception: %s\n", value);
JS_FreeCString(context, value);
JS_FreeValue(context, error);
}
JS_FreeValue(context, result);
free(source);
}
JSValue _print(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) {
for (int i = 0; i < argc; ++i) {
if (JS_IsNull(argv[i])) {
printf(" null");
} else {
const char* value = JS_ToCString(context, argv[i]);
printf(" %s", value);
JS_FreeCString(context, value);
}
}
printf("\n");
return JS_NULL;
}
static JSValue _utf8Decode(JSContext* context, uint8_t* data, size_t length) {
return JS_NewStringLen(context, (const char*)data, length);
}
static JSValue _utf8_decode(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) {
JSValue result = JS_NULL;
size_t length;
if (JS_IsString(argv[0])) {
result = JS_DupValue(context, argv[0]);
} else {
uint8_t* array = tf_try_get_array_buffer(context, &length, argv[0]);
if (array) {
result = _utf8Decode(context, array, length);
} else {
size_t offset;
size_t element_size;
JSValue buffer = tf_try_get_typed_array_buffer(context, argv[0], &offset, &length, &element_size);
size_t size;
if (!JS_IsException(buffer)) {
array = tf_try_get_array_buffer(context, &size, buffer);
if (array) {
result = _utf8Decode(context, array, size);
}
}
JS_FreeValue(context, buffer);
}
}
return result;
}
void tf_ssb_init(JSContext* context, tf_ssb_t* ssb) void tf_ssb_init(JSContext* context, tf_ssb_t* ssb)
{ {
JS_NewClassID(&_tf_ssb_classId); JS_NewClassID(&_tf_ssb_classId);
@ -292,7 +633,7 @@ void tf_ssb_init(JSContext* context, tf_ssb_t* ssb)
} }
tf_ssb_set_broadcasts_changed_callback(ssb, _tf_ssb_broadcasts_changed, NULL); tf_ssb_set_broadcasts_changed_callback(ssb, _tf_ssb_broadcasts_changed, NULL);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL); tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed, NULL, NULL);
JSValue global = JS_GetGlobalObject(context); JSValue global = JS_GetGlobalObject(context);
JSValue object = JS_NewObjectClass(context, _tf_ssb_classId); JSValue object = JS_NewObjectClass(context, _tf_ssb_classId);
@ -308,7 +649,17 @@ void tf_ssb_init(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3)); JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3));
JS_SetPropertyStr(context, object, "post", JS_NewCFunction(context, _tf_ssb_post, "post", 1)); JS_SetPropertyStr(context, object, "post", JS_NewCFunction(context, _tf_ssb_post, "post", 1));
JS_SetPropertyStr(context, object, "appendMessage", JS_NewCFunction(context, _tf_ssb_appendMessage, "appendMessage", 1)); JS_SetPropertyStr(context, object, "appendMessage", JS_NewCFunction(context, _tf_ssb_appendMessage, "appendMessage", 1));
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0));
JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1));
JS_SetPropertyStr(context, object, "registerRpc", JS_NewCFunction(context, _tf_ssb_register_rpc, "registerRpc", 2));
JS_SetPropertyStr(context, object, "registerBlobWantAdded", JS_NewCFunction(context, _tf_ssb_register_blob_want_added, "registerBlobWantAdded", 1));
JS_SetPropertyStr(context, object, "registerConnectionsChanged", JS_NewCFunction(context, _tf_ssb_register_connections_changed, "registerConnectionsChanged", 1));
JS_SetPropertyStr(context, global, "debug_print", JS_NewCFunction(context, _print, "debug_print", 2));
JS_SetPropertyStr(context, global, "debug_utf8Decode", JS_NewCFunction(context, _utf8_decode, "debug_utf8Decode", 1));
JS_FreeValue(context, global); JS_FreeValue(context, global);
tf_ssb_run_file(context, "core/ssb.js");
} }

View File

@ -32,7 +32,7 @@ typedef struct _tf_ssb_rpc_t
const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth); const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth);
static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) /*static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
JSContext* context = tf_ssb_connection_get_context(connection); JSContext* context = tf_ssb_connection_get_context(connection);
sqlite3* db = tf_ssb_connection_get_db(connection); sqlite3* db = tf_ssb_connection_get_db(connection);
@ -59,9 +59,9 @@ static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags,
k_ssb_rpc_flag_end_error; k_ssb_rpc_flag_end_error;
const char* result = have ? "true" : "false"; const char* result = have ? "true" : "false";
tf_ssb_connection_rpc_send(connection, send_flags, -request_number, (const uint8_t*)result, strlen(result), NULL, NULL); tf_ssb_connection_rpc_send(connection, send_flags, -request_number, (const uint8_t*)result, strlen(result), NULL, NULL);
} }*/
static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) /*static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_connection_get_context(connection); JSContext* context = tf_ssb_connection_get_context(connection);
@ -89,7 +89,7 @@ static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags,
JS_FreeValue(context, blob_id_value); JS_FreeValue(context, blob_id_value);
} }
JS_FreeValue(context, blob_ids); JS_FreeValue(context, blob_ids);
} }*/
typedef struct _tf_ssb_connection_blobs_get_t typedef struct _tf_ssb_connection_blobs_get_t
{ {
@ -291,7 +291,7 @@ static void _tf_ssb_blob_wants_update(tf_ssb_blob_wants_t* wants)
} }
} }
static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) /*static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
tf_ssb_rpc_t* rpc = user_data; tf_ssb_rpc_t* rpc = user_data;
tf_ssb_blob_wants_t* wants = malloc(sizeof(tf_ssb_blob_wants_t)); tf_ssb_blob_wants_t* wants = malloc(sizeof(tf_ssb_blob_wants_t));
@ -304,9 +304,9 @@ static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8
}; };
rpc->wants = wants; rpc->wants = wants;
_tf_ssb_blob_wants_update(wants); _tf_ssb_blob_wants_update(wants);
} }*/
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) /*static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
JSContext* context = tf_ssb_connection_get_context(connection); JSContext* context = tf_ssb_connection_get_context(connection);
sqlite3* db = tf_ssb_connection_get_db(connection); sqlite3* db = tf_ssb_connection_get_db(connection);
@ -358,7 +358,7 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin
JS_FreeCString(context, author); JS_FreeCString(context, author);
JS_FreeValue(context, idval); JS_FreeValue(context, idval);
JS_FreeValue(context, streamArgs); JS_FreeValue(context, streamArgs);
} }*/
static void _tf_ssb_connection_on_rpc_createHistoryStream_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue val, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_connection_on_rpc_createHistoryStream_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue val, const uint8_t* message, size_t size, void* user_data)
{ {
@ -653,23 +653,27 @@ tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb)
*rpc = (tf_ssb_rpc_t) { *rpc = (tf_ssb_rpc_t) {
.wants_async.data = rpc, .wants_async.data = rpc,
}; };
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); //tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); //tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL);
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); (void)_tf_ssb_rpc_connections_changed_callback;
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); (void)_tf_ssb_rpc_on_connections_changed;
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL);
tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL);
uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); //tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc);
uv_unref((uv_handle_t*)&rpc->wants_async); //tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL);
//uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async);
(void)_tf_ssb_rpc_wants_async;
//uv_unref((uv_handle_t*)&rpc->wants_async);
return rpc; return rpc;
} }
static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle) /*static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle)
{ {
free(handle->data); free(handle->data);
} }*/
void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc) void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc)
{ {
uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed); //uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed);
free(rpc);
} }

View File

@ -1,6 +1,7 @@
#include "ssb.h" #include "ssb.h"
#include "ssb.db.h" #include "ssb.db.h"
#include "ssb.qjs.h"
#include <assert.h> #include <assert.h>
#include <stdlib.h> #include <stdlib.h>
@ -76,15 +77,17 @@ static void _tf_ssb_test_ssb()
uv_loop_init(&loop); uv_loop_init(&loop);
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, db0, NULL); tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, db0, NULL);
tf_ssb_init(tf_ssb_get_context(ssb0), ssb0);
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1, NULL); tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1, NULL);
tf_ssb_init(tf_ssb_get_context(ssb1), ssb1);
test_t test = { test_t test = {
.ssb0 = ssb0, .ssb0 = ssb0,
.ssb1 = ssb1, .ssb1 = ssb1,
}; };
tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, &test); tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, &test); tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test);
tf_ssb_generate_keys(ssb0); tf_ssb_generate_keys(ssb0);
tf_ssb_generate_keys(ssb1); tf_ssb_generate_keys(ssb1);

View File

@ -433,7 +433,7 @@ static void _test_socket(const char* exe_path)
" print('connected', s.isConnected);\n" " print('connected', s.isConnected);\n"
" print(s.peerName);\n" " print(s.peerName);\n"
" s.read(function(data) {\n" " s.read(function(data) {\n"
" print('read', data.length);\n" " print('read', data ? data.length : null);\n"
" });\n" " });\n"
" s.write('GET / HTTP/1.0\\r\\n\\r\\n');\n" " s.write('GET / HTTP/1.0\\r\\n\\r\\n');\n"
"}).then(function() {\n" "}).then(function() {\n"
@ -452,7 +452,7 @@ static void _test_socket(const char* exe_path)
"s2.connect('www.unprompted.com', 443).then(function() {\n" "s2.connect('www.unprompted.com', 443).then(function() {\n"
" print('connected');\n" " print('connected');\n"
" s2.read(function(data) {\n" " s2.read(function(data) {\n"
" print('read', data.length);\n" " print('read', data ? data.length : null);\n"
" });\n" " });\n"
" return s2.startTls();\n" " return s2.startTls();\n"
"}).then(function() {\n" "}).then(function() {\n"