ssb: Refactoring EBT implementation. I think this works not worse than before and will let me schedule message replication better in a future change. #93

This commit is contained in:
2025-01-03 13:59:25 -05:00
parent 3f343b283b
commit 44d9f69434
6 changed files with 459 additions and 196 deletions

View File

@ -3,6 +3,7 @@
#include "log.h"
#include "mem.h"
#include "ssb.db.h"
#include "ssb.ebt.h"
#include "ssb.h"
#include "util.js.h"
@ -855,6 +856,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
break;
}
}
tf_ssb_ebt_set_messages_sent(tf_ssb_connection_get_ebt(connection), request->author, request->out_max_sequence_seen);
if (!request->out_finished)
{
_tf_ssb_connection_send_history_stream(
@ -943,197 +945,24 @@ static void _tf_ssb_rpc_createHistoryStream(
JS_FreeValue(context, arg_array);
}
typedef struct _ebt_clock_row_t
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection)
{
char id[k_id_base64_len];
int64_t value;
} ebt_clock_row_t;
typedef struct _ebt_replicate_send_clock_t
{
int64_t request_number;
ebt_clock_row_t* clock;
int clock_count;
char* out_clock;
} ebt_replicate_send_clock_t;
static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data)
{
ebt_replicate_send_clock_t* work = user_data;
JSMallocFunctions funcs = { 0 };
tf_get_js_malloc_functions(&funcs);
JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
JSContext* context = JS_NewContext(runtime);
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSValue full_clock = JS_NewObject(context);
int64_t depth = 2;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth);
tf_ssb_release_db_reader(ssb, db);
/* Ask for every identity we know is being followed from local accounts. */
const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth);
for (int i = 0; visible[i]; i++)
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
tf_ssb_ebt_clock_t* clock = tf_ssb_ebt_get_messages_to_send(ebt);
if (clock)
{
int64_t sequence = 0;
tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
}
tf_free(visible);
/* Ask about the incoming connection, too. */
char id[k_id_base64_len] = "";
if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
{
JSValue in_clock = JS_GetPropertyStr(context, full_clock, id);
if (JS_IsUndefined(in_clock))
for (int i = 0; i < clock->count; i++)
{
int64_t sequence = 0;
tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
}
JS_FreeValue(context, in_clock);
}
/* Also respond with what we know about all requested identities. */
for (int i = 0; i < work->clock_count; i++)
{
JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id);
if (JS_IsUndefined(in_clock))
{
int64_t sequence = -1;
tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0);
JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
}
JS_FreeValue(context, in_clock);
}
JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL);
size_t size = 0;
const char* string = JS_ToCStringLen(context, &size, json);
char* copy = tf_malloc(size + 1);
memcpy(copy, string, size + 1);
work->out_clock = copy;
JS_FreeCString(context, string);
JS_FreeValue(context, json);
JS_FreeValue(context, full_clock);
JS_FreeContext(context);
JS_FreeRuntime(runtime);
}
static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
ebt_replicate_send_clock_t* work = user_data;
tf_free(work->clock);
if (work->out_clock)
{
tf_ssb_connection_rpc_send(
connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
tf_free(work->out_clock);
}
tf_free(work);
}
static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message)
{
ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t));
*work = (ebt_replicate_send_clock_t) {
.request_number = request_number,
};
JSContext* context = tf_ssb_connection_get_context(connection);
if (!JS_IsUndefined(message))
{
JSPropertyEnum* ptab = NULL;
uint32_t plen = 0;
if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
{
work->clock_count = (int)plen;
work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen);
memset(work->clock, 0, sizeof(ebt_clock_row_t) * plen);
for (uint32_t i = 0; i < plen; ++i)
tf_ssb_ebt_clock_entry_t* entry = &clock->entries[i];
int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
_tf_ssb_connection_send_history_stream(connection, request_number, entry->id, entry->value, false, live, false);
if (live)
{
const char* id = JS_AtomToCString(context, ptab[i].atom);
snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id);
JS_FreeCString(context, id);
JSPropertyDescriptor desc = { 0 };
JSValue key_value = JS_UNDEFINED;
if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
{
key_value = desc.value;
JS_FreeValue(context, desc.setter);
JS_FreeValue(context, desc.getter);
}
JS_ToInt64(context, &work->clock[i].value, key_value);
JS_FreeValue(context, key_value);
JS_FreeAtom(context, ptab[i].atom);
tf_ssb_connection_add_new_message_request(connection, entry->id, request_number, false);
}
js_free(context, ptab);
}
}
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work);
}
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)
{
if (JS_IsUndefined(message))
{
return;
}
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
JSPropertyEnum* ptab = NULL;
uint32_t plen = 0;
if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
{
for (uint32_t i = 0; i < plen; ++i)
{
JSValue in_clock = JS_UNDEFINED;
JSPropertyDescriptor desc = { 0 };
if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
{
in_clock = desc.value;
JS_FreeValue(context, desc.setter);
JS_FreeValue(context, desc.getter);
}
if (!JS_IsUndefined(in_clock))
{
JSValue key = JS_AtomToString(context, ptab[i].atom);
int64_t sequence = -1;
JS_ToInt64(context, &sequence, in_clock);
const char* author = JS_ToCString(context, key);
if (sequence >= 0 && (sequence & 1) == 0)
{
int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live, false);
if (live)
{
tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
}
}
else
{
tf_ssb_connection_remove_new_message_request(connection, author);
}
JS_FreeCString(context, author);
JS_FreeValue(context, key);
}
JS_FreeValue(context, in_clock);
}
for (uint32_t i = 0; i < plen; ++i)
{
JS_FreeAtom(context, ptab[i].atom);
}
js_free(context, ptab);
tf_free(clock);
}
}
@ -1143,6 +972,25 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi
tf_ssb_connection_adjust_read_backpressure(connection, -1);
}
static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data)
{
tf_ssb_connection_t* connection = user_data;
if (clock && clock->count)
{
JSContext* context = tf_ssb_connection_get_context(connection);
JSValue message = JS_NewObject(context);
for (int i = 0; i < clock->count; i++)
{
JS_SetPropertyStr(context, message, clock->entries[i].id, JS_NewInt64(context, clock->entries[i].value));
}
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
}
_tf_ssb_rpc_ebt_replicate_send_messages(connection);
}
typedef struct _resend_clock_t
{
tf_ssb_connection_t* connection;
@ -1154,8 +1002,8 @@ static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connecti
resend_clock_t* resend = user_data;
if (!skip)
{
_tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED);
tf_ssb_connection_set_sent_clock(resend->connection, true);
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection);
}
tf_free(user_data);
}
@ -1186,9 +1034,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
tf_ssb_connection_adjust_read_backpressure(connection, 1);
tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
if (tf_ssb_connection_get_sent_clock(connection) && !tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection))
if (!tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection))
{
tf_ssb_connection_set_sent_clock(connection, false);
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
*resend = (resend_clock_t) {
.connection = connection,
@ -1199,13 +1046,9 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
}
else
{
/* EBT clock. */
if (!tf_ssb_connection_get_sent_clock(connection))
{
_tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock);
tf_ssb_connection_set_sent_clock(connection, true);
}
_tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock);
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
tf_ssb_ebt_receive_clock(ebt, context, in_clock);
tf_ssb_ebt_get_send_clock(ebt, request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection);
}
JS_FreeValue(context, name);
JS_FreeValue(context, author);