EBT replicate to the worker threads. Almost there.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4501 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-10-08 16:17:56 +00:00
parent 575f6c2f0a
commit d8530f228e

View File

@ -958,10 +958,31 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin
JS_FreeValue(context, arg_array); JS_FreeValue(context, arg_array);
} }
static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) typedef struct _ebt_clock_row_t
{ {
char id[k_id_base64_len];
int64_t value;
} ebt_clock_row_t;
typedef struct _ebt_replicate_send_clock_t
{
int64_t request_number;
ebt_clock_row_t* clock;
int clock_count;
char* out_clock;
} ebt_replicate_send_clock_t;
static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data)
{
ebt_replicate_send_clock_t* work = user_data;
JSMallocFunctions funcs = { 0 };
tf_get_js_malloc_functions(&funcs);
JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
JSContext* context = JS_NewContext(runtime);
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSValue full_clock = JS_NewObject(context); JSValue full_clock = JS_NewObject(context);
/* Ask for every identity we know is being followed from local accounts. */ /* Ask for every identity we know is being followed from local accounts. */
@ -989,37 +1010,74 @@ static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection
} }
/* Also respond with what we know about all requested identities. */ /* Also respond with what we know about all requested identities. */
for (int i = 0; i < work->clock_count; i++)
{
JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id);
if (JS_IsUndefined(in_clock))
{
int64_t sequence = -1;
tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
}
}
JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL);
size_t size = 0;
const char* string = JS_ToCStringLen(context, &size, json);
char* copy = tf_malloc(size + 1);
memcpy(copy, string, size + 1);
work->out_clock = copy;
JS_FreeCString(context, string);
JS_FreeValue(context, json);
JS_FreeValue(context, full_clock);
JS_FreeContext(context);
JS_FreeRuntime(runtime);
}
static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
ebt_replicate_send_clock_t* work = user_data;
tf_free(work->clock);
if (work->out_clock)
{
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
tf_free(work->out_clock);
}
tf_free(work);
}
static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message)
{
ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t));
*work = (ebt_replicate_send_clock_t)
{
.request_number = request_number,
};
JSContext* context = tf_ssb_connection_get_context(connection);
if (!JS_IsUndefined(message)) if (!JS_IsUndefined(message))
{ {
JSPropertyEnum* ptab; JSPropertyEnum* ptab;
uint32_t plen; uint32_t plen = 0;
JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK);
work->clock_count = (int)plen;
work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen);
for (uint32_t i = 0; i < plen; ++i) for (uint32_t i = 0; i < plen; ++i)
{ {
JSValue in_clock = JS_GetProperty(context, full_clock, ptab[i].atom); const char* id = JS_AtomToCString(context, ptab[i].atom);
if (JS_IsUndefined(in_clock)) snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id);
{ JS_FreeCString(context, id);
JSValue key = JS_AtomToString(context, ptab[i].atom);
const char* key_string = JS_ToCString(context, key); JSValue value = JS_GetProperty(context, message, ptab[i].atom);
if (key_string) JS_ToInt64(context, &work->clock[i].value, value);
{ JS_FreeValue(context, value);
int64_t sequence = -1;
tf_ssb_db_get_latest_message_by_author(ssb, key_string, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, key_string, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
}
JS_FreeCString(context, key_string);
JS_FreeValue(context, key);
}
}
for (uint32_t i = 0; i < plen; ++i)
{
JS_FreeAtom(context, ptab[i].atom); JS_FreeAtom(context, ptab[i].atom);
} }
js_free(context, ptab); js_free(context, ptab);
} }
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -request_number, full_clock, NULL, NULL, NULL); tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work);
JS_FreeValue(context, full_clock);
} }
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message) static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)