diff --git a/core/ssb.js b/core/ssb.js index 3bc8631c..fb2be7e2 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -62,12 +62,17 @@ function get_latest_sequence_for_author(author) { return sequence; } +function storeMessage(message) { + var payload = message.message.value ? message.message.value : message.message; + if (typeof(payload) == 'object') { + ssb.storeMessage(payload); + } +} + ssb.addEventListener('connections', function(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, keys: false}]}, function(message) { - ssb.storeMessage(message.message.value ? message.message.value : message.message); - }); + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { @@ -92,12 +97,14 @@ ssb.addEventListener('connections', function(change, connection) { } }); }); - followingDeep(g_database, [ssb.whoami()], 2).then(function(ids) { + var me = ssb.whoami(); + followingDeep(g_database, [me], 2).then(function(ids) { for (let id of ids) { + if (id == me) { + continue; + } var sequence = get_latest_sequence_for_author(id); - connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, function(message) { - ssb.storeMessage(message.message.value ? message.message.value : message.message); - }); + connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); } }); } else if (change == 'remove') { @@ -140,6 +147,12 @@ ssb.addRpc(['blobs', 'get'], function(request) { } }); +ssb.addRpc(['gossip', 'ping'], function(request) { + request.more(function(message) { + message.send_json(message.message); + }); +}); + ssb.addRpc(['createHistoryStream'], function(request) { var id = request.args[0].id; var seq = request.args[0].seq; diff --git a/src/ssb.c b/src/ssb.c index f5f51e0f..5e7c4d79 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -404,7 +404,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect return found; } -static void _tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) { if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { @@ -443,7 +443,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, { if (request_number > 0) { - _tf_ssb_connection_add_request(connection, request_number, callback, user_data); + tf_ssb_connection_add_request(connection, request_number, callback, user_data); } uint8_t* combined = malloc(9 + size); *combined = flags; @@ -939,16 +939,19 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t JSContext* context = connection->ssb->context; JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL); - if (JS_IsObject(val)) + if (!JS_IsUndefined(val)) { bool found = false; - for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) + if (JS_IsObject(val)) { - if (_tf_ssb_name_equals(context, val, it->name)) + for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { - it->callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, it->user_data); - found = true; - break; + if (_tf_ssb_name_equals(context, val, it->name)) + { + it->callback(connection, flags, request_number, JS_DupValue(context, val), NULL, 0, it->user_data); + found = true; + break; + } } } if (!found) @@ -970,6 +973,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } } } + else + { + printf("Failed to parse %.*s\n", (int)size, message); + } JS_FreeValue(context, val); } diff --git a/src/ssb.h b/src/ssb.h index 45f14c57..ae025da9 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -111,5 +111,6 @@ void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callba void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data); void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data); +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); JSClassID tf_ssb_get_connection_class_id(); diff --git a/src/ssb.js.c b/src/ssb.js.c index 383a60a0..1099b467 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -16,6 +16,8 @@ static JSClassID _tf_ssb_classId; +void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data); + static JSValue _tf_ssb_whoami(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); @@ -368,6 +370,21 @@ static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, return JS_UNDEFINED; } +static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); + tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); + JSValue request_val = JS_GetPropertyStr(context, this_val, "request_number"); + int32_t request_number; + JS_ToInt32(context, &request_number, request_val); + JS_FreeValue(context, request_val); + + tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]))); + + JS_FreeValue(context, connection_val); + return JS_UNDEFINED; +} + static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); @@ -408,6 +425,7 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ JS_SetPropertyStr(context, object, "message", message && size ? JS_NewArrayBufferCopy(context, message, size) : args); JS_SetPropertyStr(context, object, "send_json", JS_NewCFunction(context, _tf_ssb_rpc_send_json, "send_json", 1)); JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1)); + JS_SetPropertyStr(context, object, "more", JS_NewCFunction(context, _tf_ssb_rpc_more, "more", 1)); JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object); tf_util_report_error(context, result); @@ -501,7 +519,6 @@ static void _tf_ssb_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change JS_NewString(context, "add"), object, }; - printf("calling function for ptr %p IsFunction=%d\n", user_data, JS_IsFunction(context, callback)); response = JS_Call(context, callback, JS_UNDEFINED, 2, args); tf_util_report_error(context, response); JS_FreeValue(context, args[0]);