Continuing to try to get this thing talking to other clients.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3679 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2021-10-31 19:39:16 +00:00
parent 0a01332d1f
commit 08cd0ec878
3 changed files with 38 additions and 11 deletions

View File

@ -62,24 +62,25 @@ async function test_following() {
test_following(); test_following();
async function get_latest_sequence_for_author(author) { function get_latest_sequence_for_author(author) {
var sequence = 0; var sequence = 0;
await ssb.sqlStream( ssb.sqlStream(
'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1', 'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
[author], [author],
function(row) { function(row) {
if (row.sequence) {
sequence = row.sequence + 1; sequence = row.sequence + 1;
}
}); });
return sequence; return sequence;
} }
ssb.registerConnectionsChanged(function(change, connection) { ssb.registerConnectionsChanged(function(change, connection) {
if (change == 'add') { if (change == 'add') {
get_latest_sequence_for_author(connection.id).then(function(sequence) { var sequence = get_latest_sequence_for_author(connection.id);
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence}]}, function(message) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence}]}, function(message) {
ssb.storeMessage(message.message.value); ssb.storeMessage(message.message.value);
}); });
});
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
Object.keys(message.message).forEach(function(id) { Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) { if (message.message[id] < 0) {
@ -133,8 +134,10 @@ ssb.registerRpc(['blobs', 'has'], function(request) {
}); });
ssb.registerRpc(['blobs', 'get'], function(request) { ssb.registerRpc(['blobs', 'get'], function(request) {
var blob = ssb.blobGet(request.args[0].id); for (let id of request.args) {
var blob = ssb.blobGet(id);
request.send_binary(blob); request.send_binary(blob);
}
}); });
ssb.registerRpc(['createHistoryStream'], function(request) { ssb.registerRpc(['createHistoryStream'], function(request) {

View File

@ -180,11 +180,33 @@ static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data)
sqlStream_callback_t* info = user_data; sqlStream_callback_t* info = user_data;
JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row); JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row);
_check_call(info->context, response); _check_call(info->context, response);
JS_FreeValue(info->context, response);
if (tf_task_get(info->context)) if (tf_task_get(info->context))
{ {
tf_task_run_jobs(tf_task_get(info->context)); tf_task_run_jobs(tf_task_get(info->context));
} }
JS_FreeValue(info->context, response); else
{
JSRuntime* runtime = JS_GetRuntime(info->context);
while (JS_IsJobPending(runtime))
{
JSContext* context = NULL;
int r = JS_ExecutePendingJob(runtime, &context);
if (context)
{
JSValue result = JS_GetException(context);
_check_call(context, result);
}
if (r < 0)
{
js_std_dump_error(context);
}
else if (r == 0)
{
break;
}
}
}
} }
static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)

View File

@ -134,6 +134,7 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
tf_ssb_id_str_to_bin(id0bin, id0); tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin); tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
printf("Waiting for connection.\n");
while (test.connection_count0 != 1 || while (test.connection_count0 != 1 ||
test.connection_count1 != 1) test.connection_count1 != 1)
{ {
@ -141,12 +142,13 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
} }
tf_ssb_server_close(ssb0); tf_ssb_server_close(ssb0);
printf("Waiting for messages.\n");
while (_ssb_test_count_messages(ssb1) < 3) while (_ssb_test_count_messages(ssb1) < 3)
{ {
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
} }
printf("waiting for blob\n"); printf("Waiting for blob.\n");
while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL)) while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL))
{ {
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);