ssb.js is now entirely in C. Usual disclaimers about it not being amazingly well tested.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4111 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-01-08 20:01:35 +00:00
parent 53e4f4341c
commit 69253432b8
9 changed files with 359 additions and 460 deletions

View File

@ -342,7 +342,6 @@ async function getProcessBlob(blobId, key, options) {
});
}
};
delete imports.ssb.addRpc;
if (process.credentials &&
process.credentials.session &&

View File

@ -1,164 +0,0 @@
"use strict";
var g_database = new Database('core');
const k_use_create_history_stream = false;
function get_latest_sequence_for_author(author) {
var sequence = 0;
ssb.sqlStream(
'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
[author],
function(row) {
if (row.sequence) {
sequence = row.sequence;
}
});
return sequence;
}
function storeMessage(message) {
var payload = message.message.value ? message.message.value : message.message;
if (typeof(payload) == 'object') {
ssb.storeMessage(payload);
}
}
ssb.addEventListener('connections', function on_connections_changed(change, connection) {
if (change == 'add') {
var sequence = get_latest_sequence_for_author(connection.id);
if (k_use_create_history_stream) {
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
var identities = ssb.getAllIdentities();
let ids = ssb.followingDeep(identities, 2);
for (let id of ids) {
if (identities.indexOf(id) != -1) {
continue;
}
var sequence = get_latest_sequence_for_author(id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
}
} else {
if (connection.is_client) {
connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
}
}
}
});
function ebtReplicateSendClock(request, have) {
var identities = ssb.getAllIdentities();
var message = {};
var last_sent = request.connection.sent_clock || {};
var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]);
if (!Object.keys(last_sent).length) {
for (let id of ids) {
message[id] = get_latest_sequence_for_author(id);
}
}
for (let id of Object.keys(have)) {
if (message[id] === undefined) {
var sequence = get_latest_sequence_for_author(id);
message[id] = sequence ? sequence : -1;
}
}
var to_send = {}
var offset = Math.floor(Math.random() * ids.length);
for (var i = 0; i < ids.length; i++) {
var id = ids[(i + offset) % ids.length];
if (last_sent[id] === undefined || message[id] > last_sent[id]) {
last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1;
}
if (Object.keys(to_send).length >= 32) {
request.send_json(to_send);
to_send = {};
}
}
request.connection.sent_clock = last_sent;
if (Object.keys(to_send).length) {
request.send_json(to_send);
}
}
function formatMessage(row) {
if (row.sequence_before_author) {
return {
previous: row.previous,
sequence: row.sequence,
author: row.author,
timestamp: row.timestamp,
hash: row.hash,
content: JSON.parse(row.content),
signature: row.signature,
};
} else {
return {
previous: row.previous,
author: row.author,
sequence: row.sequence,
timestamp: row.timestamp,
hash: row.hash,
content: JSON.parse(row.content),
signature: row.signature,
};
}
}
function ebtReplicateRegisterMessageCallback(request) {
ssb.addEventListener('message', function(message_id) {
if (request.connection.send_clock) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1',
[message_id],
function (row) {
if (request.connection.send_clock[row.author] < row.sequence) {
request.send_json(formatMessage(row));
}
});
}
});
}
function ebtReplicateCommon(request) {
if (request.message.author) {
storeMessage(request);
} else {
ebtReplicateSendClock(request, request.message);
if (!request.connection.send_clock) {
request.connection.send_clock = {};
}
for (let id of Object.keys(request.message)) {
if (request.message[id] >= 0 && (request.message[id] & 1) == 0) {
request.connection.send_clock[id] = request.message[id] >> 1;
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, request.message[id] >> 1],
function (row) {
request.send_json(formatMessage(row));
request.connection.send_clock[id] = row.sequence;
});
} else {
delete request.connection.send_clock[id];
}
}
}
}
function ebtReplicateClient(request) {
if (request.message?.name !== 'Error') {
if (!request.connection.message_registered) {
ebtReplicateRegisterMessageCallback(request);
request.connection.message_registered = true;
}
ebtReplicateCommon(request);
}
}
function ebtReplicateServer(request) {
ebtReplicateRegisterMessageCallback(request);
ebtReplicateSendClock(request, {});
request.more(ebtReplicateCommon);
}
ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer);

135
src/ssb.c
View File

@ -206,6 +206,9 @@ typedef struct _tf_ssb_connection_t
int32_t tunnel_request_number;
tf_ssb_blob_wants_t blob_wants;
bool sent_clock;
int32_t ebt_request_number;
JSValue ebt_send_clock;
JSValue object;
@ -521,6 +524,8 @@ void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection,
int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare);
if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0)
{
connection->message_requests[index].request_number = request_number;
connection->message_requests[index].keys = keys;
return;
}
connection->message_requests = tf_resize_vec(connection->message_requests, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count + 1));
@ -537,6 +542,16 @@ void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection,
connection->message_requests_count++;
}
void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author)
{
int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare);
if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0)
{
memmove(connection->message_requests + index, connection->message_requests + index + 1, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index));
connection->message_requests_count--;
}
}
static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number)
{
tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
@ -1593,6 +1608,8 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea
!connection->tcp.data &&
!connection->connect.data)
{
JS_FreeValue(ssb->context, connection->ebt_send_clock);
connection->ebt_send_clock = JS_UNDEFINED;
tf_free(connection->message_requests);
connection->message_requests = NULL;
connection->message_requests_count = 0;
@ -2079,64 +2096,6 @@ static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value)
}
}
static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
if (!user_data)
{
return;
}
void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);
_tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data);
}
static void _tf_ssb_connection_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
if (user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
}
static JSValue _tf_ssb_connection_send_json_internal(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv, int flags)
{
tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id);
if (!connection)
{
return JS_UNDEFINED;
}
uint32_t request_number = tf_ssb_connection_next_request_number(connection);
JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL);
size_t size;
const char* message = JS_ToCStringLen(context, &size, message_val);
JS_FreeValue(context, message_val);
tf_ssb_connection_rpc_send(
connection,
flags,
request_number,
(const uint8_t*)message,
size,
_tf_ssb_connection_send_json_response,
_tf_ssb_connection_cleanup_value,
JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL);
JS_FreeCString(context, message);
return JS_NewInt32(context, request_number);
}
static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
return _tf_ssb_connection_send_json_internal(context, this_val, argc, argv, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream);
}
static JSValue _tf_ssb_connection_send_json_async(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
return _tf_ssb_connection_send_json_internal(context, this_val, argc, argv, k_ssb_rpc_flag_json);
}
static void _tf_ssb_connection_process_message_async(uv_async_t* async)
{
tf_ssb_connection_t* connection = async->data;
@ -2181,8 +2140,6 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection);
JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetPropertyStr(context, connection->object, "send_json_async", JS_NewCFunction(context, _tf_ssb_connection_send_json_async, "send_json_async", 2));
char public_key_str[k_id_base64_len] = { 0 };
if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key))
{
@ -2240,8 +2197,6 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char*
tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(tunnel->object, tunnel);
JS_SetPropertyStr(context, tunnel->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetPropertyStr(context, tunnel->object, "send_json_async", JS_NewCFunction(context, _tf_ssb_connection_send_json_async, "send_json_async", 2));
JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id));
JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE);
@ -2340,8 +2295,6 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetPropertyStr(ssb->context, connection->object, "send_json_async", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json_async, "send_json_async", 2));
JS_SetOpaque(connection->object, connection);
if (uv_tcp_init(ssb->loop, &connection->tcp) != 0)
@ -3118,3 +3071,57 @@ tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t*
{
return connection ? &connection->blob_wants : NULL;
}
bool tf_ssb_verify_strip_and_store_message(tf_ssb_t* ssb, JSValue value)
{
JSContext* context = tf_ssb_get_context(ssb);
char signature[crypto_sign_BYTES + 128];
char id[crypto_hash_sha256_BYTES * 2 + 1];
bool sequence_before_author = false;
if (tf_ssb_verify_and_strip_signature(context, value, id, sizeof(id), signature, sizeof(signature), &sequence_before_author))
{
if (tf_ssb_db_store_message(ssb, context, id, value, signature, sequence_before_author))
{
tf_ssb_notify_message_added(ssb, id);
return true;
}
}
else
{
printf("failed to verify message\n");
}
return false;
}
bool tf_ssb_connection_get_sent_clock(tf_ssb_connection_t* connection)
{
return connection->sent_clock;
}
void tf_ssb_connection_set_sent_clock(tf_ssb_connection_t* connection, bool sent_clock)
{
connection->sent_clock = sent_clock;
}
int32_t tf_ssb_connection_get_ebt_request_number(tf_ssb_connection_t* connection)
{
return connection->ebt_request_number;
}
void tf_ssb_connection_set_ebt_request_number(tf_ssb_connection_t* connection, int32_t request_number)
{
connection->ebt_request_number = request_number;
}
JSValue tf_ssb_connection_get_ebt_send_clock(tf_ssb_connection_t* connection)
{
JSContext* context = connection->ssb->context;
return JS_DupValue(context, connection->ebt_send_clock);
}
void tf_ssb_connection_set_ebt_send_clock(tf_ssb_connection_t* connection, JSValue send_clock)
{
JSContext* context = connection->ssb->context;
JS_FreeValue(context, connection->ebt_send_clock);
connection->ebt_send_clock = JS_DupValue(context, send_clock);
}

View File

@ -1102,6 +1102,34 @@ const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count
return (const char**)result;
}
typedef struct _identities_t
{
const char** ids;
int count;
} identities_t;
static void _add_identity(const char* identity, void* user_data)
{
identities_t* identities = user_data;
char full_id[k_id_base64_len];
snprintf(full_id, sizeof(full_id), "@%s", identity);
identities->ids = tf_resize_vec(identities->ids, sizeof(const char*) * (identities->count + 1));
identities->ids[identities->count++] = tf_strdup(full_id);
}
const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth)
{
identities_t identities = { 0 };
tf_ssb_db_identity_visit_all(ssb, _add_identity, &identities);
const char** following = tf_ssb_db_following_deep(ssb, identities.ids, identities.count, depth);
for (int i = 0; i < identities.count; i++)
{
tf_free((void*)identities.ids[i]);
}
tf_free(identities.ids);
return following;
}
static void _test_private(sqlite3* db, const uint8_t* private_key)
{
sqlite3_stmt* statement = NULL;

View File

@ -38,5 +38,6 @@ JSValue tf_ssb_format_message(
const char* signature,
bool sequence_before_author);
const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth);
const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth);
void tf_ssb_db_private(sqlite3* db);

View File

@ -106,6 +106,7 @@ bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin);
bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_id, size_t out_id_size, char* out_signature, size_t out_signature_size, bool* out_sequence_before_author);
void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size);
bool tf_ssb_verify_strip_and_store_message(tf_ssb_t* ssb, JSValue value);
bool tf_ssb_connection_is_client(tf_ssb_connection_t* connection);
const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection);
@ -152,6 +153,7 @@ void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* co
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection);
void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys);
void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author);
bool tf_ssb_connection_is_attendant(tf_ssb_connection_t* connection);
int32_t tf_ssb_connection_get_attendant_request_number(tf_ssb_connection_t* connection);
@ -162,6 +164,13 @@ void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, co
tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* portal_id, int32_t request_number, const char* target_id);
int32_t tf_ssb_connection_get_ebt_request_number(tf_ssb_connection_t* connection);
void tf_ssb_connection_set_ebt_request_number(tf_ssb_connection_t* connection, int32_t request_number);
JSValue tf_ssb_connection_get_ebt_send_clock(tf_ssb_connection_t* connection);
void tf_ssb_connection_set_ebt_send_clock(tf_ssb_connection_t* connection, JSValue send_clock);
bool tf_ssb_connection_get_sent_clock(tf_ssb_connection_t* connection);
void tf_ssb_connection_set_sent_clock(tf_ssb_connection_t* connection, bool sent_clock);
JSClassID tf_ssb_get_connection_class_id();
void tf_ssb_get_stats(tf_ssb_t* ssb, tf_ssb_stats_t* out_stats);

View File

@ -313,20 +313,7 @@ static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int
static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
char signature[crypto_sign_BYTES + 128];
char id[crypto_hash_sha256_BYTES * 2 + 1];
bool sequence_before_author = false;
if (tf_ssb_verify_and_strip_signature(context, argv[0], id, sizeof(id), signature, sizeof(signature), &sequence_before_author))
{
if (tf_ssb_db_store_message(ssb, context, id, argv[0], signature, sequence_before_author))
{
tf_ssb_notify_message_added(ssb, id);
}
}
else
{
printf("failed to verify message\n");
}
tf_ssb_verify_strip_and_store_message(ssb, argv[0]);
return JS_UNDEFINED;
}
@ -416,205 +403,12 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar
return JS_UNDEFINED;
}
static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
JSValue flags_val = JS_GetPropertyStr(context, this_val, "flags");
int32_t flags_number;
JS_ToInt32(context, &flags_number, flags_val);
JS_FreeValue(context, flags_val);
JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL);
size_t size;
const char* message = JS_ToCStringLen(context, &size, message_val);
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_json | (flags_number & ~k_ssb_rpc_mask_type),
-request_number,
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
JS_FreeValue(context, connection_val);
JS_FreeCString(context, message);
JS_FreeValue(context, message_val);
return JS_NewInt32(context, -request_number);
}
static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
JSValue flags_val = JS_GetPropertyStr(context, this_val, "flags");
int32_t flags_number;
JS_ToInt32(context, &flags_number, flags_val);
JS_FreeValue(context, flags_val);
JSValue message_val = JS_JSONStringify(context, argv[0], JS_NULL, JS_NULL);
size_t size;
const char* message = JS_ToCStringLen(context, &size, message_val);
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_json | (flags_number & ~k_ssb_rpc_mask_type) | k_ssb_rpc_flag_end_error,
-request_number,
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
JS_FreeValue(context, connection_val);
JS_FreeCString(context, message);
JS_FreeValue(context, message_val);
return JS_UNDEFINED;
}
static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])), NULL);
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number");
int32_t request_number;
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
size_t size;
uint8_t* message = tf_util_try_get_array_buffer(context, &size, argv[0]);
if (message)
{
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream,
-request_number,
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
}
else
{
size_t offset;
size_t element_size;
JSValue buffer = tf_util_try_get_typed_array_buffer(context, argv[0], &offset, &size, &element_size);
if (!JS_IsException(buffer))
{
size_t total_size;
message = tf_util_try_get_array_buffer(context, &total_size, buffer);
if (message)
{
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream,
-request_number,
(const uint8_t*)message + offset,
size,
NULL,
NULL,
NULL);
}
}
JS_FreeValue(context, buffer);
}
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JSValue object = JS_NewObject(context);
JSValue connection_object = JS_DupValue(context, tf_ssb_connection_get_object(connection));
JS_SetPropertyStr(context, object, "connection", connection_object);
JS_SetPropertyStr(context, object, "flags", JS_NewUint32(context, flags));
JS_SetPropertyStr(context, object, "request_number", JS_NewInt32(context, request_number));
JS_SetPropertyStr(context, object, "args", JS_GetPropertyStr(context, args, "args"));
JS_SetPropertyStr(context, object, "message", message && size ? JS_NewArrayBufferCopy(context, message, size) : JS_DupValue(context, args));
JS_SetPropertyStr(context, object, "send_json", JS_NewCFunction(context, _tf_ssb_rpc_send_json, "send_json", 1));
JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1));
JS_SetPropertyStr(context, object, "send_json_end", JS_NewCFunction(context, _tf_ssb_rpc_send_json_end, "send_json_end", 1));
JS_SetPropertyStr(context, object, "more", JS_NewCFunction(context, _tf_ssb_rpc_more, "more", 1));
JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object);
tf_util_report_error(context, result);
JS_FreeValue(context, result);
JS_FreeValue(context, object);
}
static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (!JS_IsArray(context, argv[0]))
{
return JS_ThrowTypeError(context, "Expected argument 1 to be an array of strings.");
}
if (!JS_IsFunction(context, argv[1]))
{
return JS_ThrowTypeError(context, "Expected argument 2 to be a function.");
}
enum { k_max_name_parts = 16 };
const char* name[k_max_name_parts + 1] = { 0 };
int length = tf_util_get_length(context, argv[0]);
if (length >= k_max_name_parts)
{
return JS_ThrowInternalError(context, "Too many parts to RPC name.");
}
for (int i = 0; i < length; i++)
{
JSValue value = JS_GetPropertyUint32(context, argv[0], i);
name[i] = JS_ToCString(context, value);
JS_FreeValue(context, value);
}
tf_ssb_add_rpc_callback(ssb, name, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])));
for (int i = 0; i < length; i++)
{
JS_FreeCString(context, name[i]);
}
return JS_UNDEFINED;
}
static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
@ -1023,13 +817,8 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "followingDeep", JS_NewCFunction(context, _tf_ssb_followingDeep, "followingDeep", 2));
/* Should be trusted only. */
JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2));
JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2));
JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2));
JS_FreeValue(context, global);
tf_util_register(context);
tf_database_register(context, tf_ssb_get_db(ssb));
tf_ssb_run_file(context, "core/ssb.js");
}

View File

@ -678,26 +678,10 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c
}
}
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue arg_array = JS_GetPropertyStr(context, args, "args");
JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
if (JS_IsUndefined(arg))
{
tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream.");
}
JSValue id = JS_GetPropertyStr(context, arg, "id");
JSValue seq = JS_GetPropertyStr(context, arg, "seq");
JSValue keys = JS_GetPropertyStr(context, arg, "keys");
JSValue live = JS_GetPropertyStr(context, arg, "live");
bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
bool is_live = JS_ToBool(context, live) > 0;
int64_t sequence = 0;
JS_ToInt64(context, &sequence, seq);
const char* author = JS_ToCString(context, id);
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
@ -719,7 +703,7 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin
(const char*)sqlite3_column_text(statement, 6),
(const char*)sqlite3_column_text(statement, 7),
sqlite3_column_int(statement, 8));
if (is_keys)
if (keys)
{
message = JS_NewObject(context);
JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
@ -730,12 +714,35 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin
{
message = formatted;
}
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
}
}
sqlite3_finalize(statement);
}
}
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue arg_array = JS_GetPropertyStr(context, args, "args");
JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
if (JS_IsUndefined(arg))
{
tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream.");
}
JSValue id = JS_GetPropertyStr(context, arg, "id");
JSValue seq = JS_GetPropertyStr(context, arg, "seq");
JSValue keys = JS_GetPropertyStr(context, arg, "keys");
JSValue live = JS_GetPropertyStr(context, arg, "live");
bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
bool is_live = JS_ToBool(context, live) > 0;
int64_t sequence = 0;
JS_ToInt64(context, &sequence, seq);
const char* author = JS_ToCString(context, id);
_tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys);
if (is_live)
{
@ -750,6 +757,225 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin
JS_FreeValue(context, arg_array);
}
static bool _is_error(JSContext* context, JSValue message)
{
JSValue name = JS_GetPropertyStr(context, message, "name");
const char* name_string = JS_ToCString(context, name);
bool is_error = false;
if (name_string && strcmp(name_string, "Error") == 0)
{
is_error = true;
}
JS_FreeCString(context, name_string);
JS_FreeValue(context, name);
return is_error;
}
static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue full_clock = JS_NewObject(context);
/* Ask for every identity we know is being followed from local accounts. */
const char** visible = tf_ssb_db_get_all_visible_identities(ssb, 2);
for (int i = 0; visible[i]; i++)
{
int64_t sequence = 0;
tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence));
}
/* Ask about the incoming connection, too. */
char id[k_id_base64_len] = "";
if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
{
JSValue in_clock = JS_GetPropertyStr(context, full_clock, id);
if (JS_IsUndefined(in_clock))
{
int64_t sequence = 0;
tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence));
}
JS_FreeValue(context, in_clock);
}
/* Also respond with what we know about all requested identities. */
if (!JS_IsUndefined(message))
{
JSPropertyEnum* ptab;
uint32_t plen;
JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK);
for (uint32_t i = 0; i < plen; ++i)
{
JSValue in_clock = JS_GetProperty(context, full_clock, ptab[i].atom);
if (JS_IsUndefined(in_clock))
{
JSValue key = JS_AtomToString(context, ptab[i].atom);
const char* key_string = JS_ToCString(context, key);
if (key_string)
{
int64_t sequence = -1;
tf_ssb_db_get_latest_message_by_author(ssb, key_string, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, key_string, JS_NewInt64(context, sequence));
}
JS_FreeCString(context, key_string);
JS_FreeValue(context, key);
}
}
for (uint32_t i = 0; i < plen; ++i)
{
JS_FreeAtom(context, ptab[i].atom);
}
js_free(context, ptab);
}
tf_free(visible);
/* TODO: Send it in bite-size chunks. */
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -request_number, full_clock, NULL, NULL, NULL);
JS_FreeValue(context, full_clock);
}
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)
{
if (JS_IsUndefined(message))
{
return;
}
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSPropertyEnum* ptab = NULL;
uint32_t plen = 0;
JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK);
for (uint32_t i = 0; i < plen; ++i)
{
JSValue in_clock = JS_GetProperty(context, message, ptab[i].atom);
if (!JS_IsUndefined(in_clock))
{
JSValue key = JS_AtomToString(context, ptab[i].atom);
int64_t sequence = -1;
JS_ToInt64(context, &sequence, in_clock);
const char* author = JS_ToCString(context, key);
if (sequence >= 0 && (sequence & 1) == 0)
{
int32_t request_number = -tf_ssb_connection_get_ebt_request_number(connection);
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence, false);
tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
}
else
{
tf_ssb_connection_remove_new_message_request(connection, author);
}
JS_FreeCString(context, author);
JS_FreeValue(context, key);
}
JS_FreeValue(context, in_clock);
}
for (uint32_t i = 0; i < plen; ++i)
{
JS_FreeAtom(context, ptab[i].atom);
}
js_free(context, ptab);
}
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
if (_is_error(context, args))
{
/* TODO: Send createHistoryStream. */
return;
}
JSValue author = JS_GetPropertyStr(context, args, "author");
JSValue name = JS_GetPropertyStr(context, args, "name");
JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED;
if (!JS_IsUndefined(author))
{
/* Looks like a message. */
tf_ssb_verify_strip_and_store_message(ssb, args);
}
else
{
/* EBT clock. */
tf_ssb_connection_set_ebt_send_clock(connection, args);
if (!tf_ssb_connection_get_sent_clock(connection))
{
_tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock);
tf_ssb_connection_set_sent_clock(connection, true);
}
_tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock);
}
JS_FreeValue(context, name);
JS_FreeValue(context, author);
}
static void _tf_ssb_rpc_ebt_replicate_client(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
}
static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue message = JS_NewObject(context);
JSValue name = JS_NewArray(context);
JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt"));
JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate"));
JS_SetPropertyStr(context, message, "name", name);
JSValue arg = JS_NewObject(context);
JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3));
JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic"));
JSValue args = JS_NewArray(context);
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(
connection,
0,
tf_ssb_connection_next_request_number(connection),
message,
_tf_ssb_rpc_ebt_replicate_client,
NULL,
NULL);
JS_FreeValue(context, message);
}
static void _tf_ssb_rpc_ebt_replicate_server(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
if (_is_error(context, args))
{
/* TODO: Send createHistoryStream. */
return;
}
if (!tf_ssb_connection_get_ebt_request_number(connection))
{
tf_ssb_connection_set_ebt_request_number(connection, request_number);
}
JSValue in_name = JS_GetPropertyStr(context, args, "name");
if (!JS_IsUndefined(in_name))
{
/* This is the server receiving the initial ebt.replicate message. Respond. */
if (!tf_ssb_connection_is_client(connection))
{
_tf_ssb_rpc_send_ebt_replicate(connection);
}
}
JS_FreeValue(context, in_name);
_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
}
static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
@ -789,6 +1015,8 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
NULL,
NULL);
JS_FreeValue(context, message);
_tf_ssb_rpc_send_ebt_replicate(connection);
}
}
else if (change == k_tf_ssb_change_remove)
@ -833,4 +1061,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb)
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_tunnel_is_room, NULL, NULL);
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL);
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL);
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL);
}

View File

@ -1555,6 +1555,7 @@ void tf_task_activate(tf_task_t* task)
JS_SetPropertyStr(context, global, "Socket", tf_socket_register(context));
JS_SetPropertyStr(context, global, "TlsContext", tf_tls_context_register(context));
tf_file_register(context);
tf_database_register(context, task->_db);
task->_ssb = tf_ssb_create(&task->_loop, task->_context, task->_db);
tf_ssb_set_trace(task->_ssb, task->_trace);