From d8530f228e4b394bc302e96f7e034eacff51bab2 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Sun, 8 Oct 2023 16:17:56 +0000 Subject: [PATCH] EBT replicate to the worker threads. Almost there. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4501 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- src/ssb.rpc.c | 102 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 2088f617..c38abdf8 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -958,10 +958,31 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin JS_FreeValue(context, arg_array); } -static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) +typedef struct _ebt_clock_row_t { + char id[k_id_base64_len]; + int64_t value; +} ebt_clock_row_t; + +typedef struct _ebt_replicate_send_clock_t +{ + int64_t request_number; + ebt_clock_row_t* clock; + int clock_count; + + char* out_clock; +} ebt_replicate_send_clock_t; + +static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data) +{ + ebt_replicate_send_clock_t* work = user_data; + + JSMallocFunctions funcs = { 0 }; + tf_get_js_malloc_functions(&funcs); + JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); + JSContext* context = JS_NewContext(runtime); + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); - JSContext* context = tf_ssb_get_context(ssb); JSValue full_clock = JS_NewObject(context); /* Ask for every identity we know is being followed from local accounts. */ @@ -989,37 +1010,74 @@ static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection } /* Also respond with what we know about all requested identities. */ + for (int i = 0; i < work->clock_count; i++) + { + JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id); + if (JS_IsUndefined(in_clock)) + { + int64_t sequence = -1; + tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0); + JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); + } + } + + JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL); + size_t size = 0; + const char* string = JS_ToCStringLen(context, &size, json); + char* copy = tf_malloc(size + 1); + memcpy(copy, string, size + 1); + work->out_clock = copy; + JS_FreeCString(context, string); + JS_FreeValue(context, json); + JS_FreeValue(context, full_clock); + + JS_FreeContext(context); + JS_FreeRuntime(runtime); +} + +static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data) +{ + ebt_replicate_send_clock_t* work = user_data; + tf_free(work->clock); + if (work->out_clock) + { + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL); + tf_free(work->out_clock); + } + tf_free(work); +} + +static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) +{ + ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t)); + *work = (ebt_replicate_send_clock_t) + { + .request_number = request_number, + }; + JSContext* context = tf_ssb_connection_get_context(connection); + if (!JS_IsUndefined(message)) { JSPropertyEnum* ptab; - uint32_t plen; + uint32_t plen = 0; JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); + work->clock_count = (int)plen; + work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen); for (uint32_t i = 0; i < plen; ++i) { - JSValue in_clock = JS_GetProperty(context, full_clock, ptab[i].atom); - if (JS_IsUndefined(in_clock)) - { - JSValue key = JS_AtomToString(context, ptab[i].atom); - const char* key_string = JS_ToCString(context, key); - if (key_string) - { - int64_t sequence = -1; - tf_ssb_db_get_latest_message_by_author(ssb, key_string, &sequence, NULL, 0); - JS_SetPropertyStr(context, full_clock, key_string, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); - } - JS_FreeCString(context, key_string); - JS_FreeValue(context, key); - } - } - for (uint32_t i = 0; i < plen; ++i) - { + const char* id = JS_AtomToCString(context, ptab[i].atom); + snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id); + JS_FreeCString(context, id); + + JSValue value = JS_GetProperty(context, message, ptab[i].atom); + JS_ToInt64(context, &work->clock[i].value, value); + JS_FreeValue(context, value); JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -request_number, full_clock, NULL, NULL, NULL); - JS_FreeValue(context, full_clock); + tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work); } static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)