#include "ssb.ebt.h" #include "mem.h" #include "ssb.db.h" #include "ssb.h" #include "util.js.h" #include "uv.h" #include typedef struct _ebt_entry_t { char id[k_id_base64_len]; int64_t out; int64_t in; bool out_replicate; bool out_receive; bool in_replicate; bool in_receive; } ebt_entry_t; typedef struct _tf_ssb_ebt_t { tf_ssb_connection_t* connection; uv_mutex_t mutex; ebt_entry_t* entries; int entries_count; } tf_ssb_ebt_t; tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection) { tf_ssb_ebt_t* ebt = tf_malloc(sizeof(tf_ssb_ebt_t)); *ebt = (tf_ssb_ebt_t) { .connection = connection, }; uv_mutex_init(&ebt->mutex); return ebt; } void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt) { uv_mutex_destroy(&ebt->mutex); tf_free(ebt->entries); tf_free(ebt); } static int _ebt_entry_compare(const void* a, const void* b) { const char* id = a; const ebt_entry_t* entry = b; return strcmp(id, entry->id); } static ebt_entry_t* _ebt_get_entry(tf_ssb_ebt_t* ebt, const char* id) { int index = tf_util_insert_index(id, ebt->entries, ebt->entries_count, sizeof(ebt_entry_t), _ebt_entry_compare); if (index < ebt->entries_count && strcmp(id, ebt->entries[index].id) == 0) { return &ebt->entries[index]; } else { ebt->entries = tf_resize_vec(ebt->entries, (ebt->entries_count + 1) * sizeof(ebt_entry_t)); if (index < ebt->entries_count) { memmove(ebt->entries + index + 1, ebt->entries + index, (ebt->entries_count - index) * sizeof(ebt_entry_t)); } ebt->entries[index] = (ebt_entry_t) { .in = -1, .out = -1, }; snprintf(ebt->entries[index].id, sizeof(ebt->entries[index].id), "%s", id); ebt->entries_count++; return &ebt->entries[index]; } } void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clock) { JSPropertyEnum* ptab = NULL; uint32_t plen = 0; if (JS_GetOwnPropertyNames(context, &ptab, &plen, clock, JS_GPN_STRING_MASK) == 0) { uv_mutex_lock(&ebt->mutex); for (uint32_t i = 0; i < plen; ++i) { JSValue in_clock = JS_UNDEFINED; JSPropertyDescriptor desc = { 0 }; if (JS_GetOwnProperty(context, &desc, clock, ptab[i].atom) == 1) { in_clock = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } if (!JS_IsUndefined(in_clock)) { JSValue key = JS_AtomToString(context, ptab[i].atom); const char* author = JS_ToCString(context, key); int64_t sequence = -1; JS_ToInt64(context, &sequence, in_clock); ebt_entry_t* entry = _ebt_get_entry(ebt, author); if (sequence < 0) { entry->in = -1; entry->in_replicate = false; entry->in_receive = false; } else { entry->in = sequence >> 1; entry->in_replicate = true; entry->in_receive = (sequence & 1) == 0; } if (!entry->in_receive) { tf_ssb_connection_remove_new_message_request(ebt->connection, author); } JS_FreeCString(context, author); JS_FreeValue(context, key); } JS_FreeValue(context, in_clock); } uv_mutex_unlock(&ebt->mutex); for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } } typedef struct _ebt_get_clock_t { tf_ssb_ebt_t* ebt; int32_t request_number; tf_ssb_ebt_clock_callback_t* callback; tf_ssb_ebt_clock_t* clock; void* user_data; } ebt_get_clock_t; static int _ebt_compare_entry(const void* a, const void* b) { const char* id = a; const tf_ssb_ebt_clock_entry_t* entry = b; return strcmp(id, entry->id); } static void _ebt_add_to_clock(ebt_get_clock_t* work, const char* id, int64_t value, bool replicate, bool receive) { int count = work->clock ? work->clock->count : 0; ebt_entry_t* entry = _ebt_get_entry(work->ebt, id); if ((replicate && !entry->out_replicate) || (receive && !entry->out_receive) || ((replicate || receive || entry->out_replicate || entry->out_receive) && entry->out != value)) { entry->out = value; entry->out_replicate = entry->out_replicate || replicate; entry->out_receive = entry->out_receive || receive; int index = tf_util_insert_index(id, count ? work->clock->entries : NULL, count, sizeof(tf_ssb_ebt_clock_entry_t), _ebt_compare_entry); int64_t out_value = entry->out_replicate ? ((value << 1) | (entry->out_receive ? 0 : 1)) : -1; if (index < count && strcmp(id, work->clock->entries[index].id) == 0) { work->clock->entries[index].value = out_value; } else { work->clock = tf_resize_vec(work->clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); if (index < count) { memmove(work->clock->entries + index + 1, work->clock->entries + index, (count - index) * sizeof(tf_ssb_ebt_clock_entry_t)); } work->clock->entries[index] = (tf_ssb_ebt_clock_entry_t) { .value = out_value }; snprintf(work->clock->entries[index].id, sizeof(work->clock->entries[index].id), "%s", id); work->clock->count = count + 1; } } } static void _tf_ssb_ebt_get_send_clock_work(tf_ssb_connection_t* connection, void* user_data) { ebt_get_clock_t* work = user_data; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->ebt->connection); int64_t depth = 2; sqlite3* db = tf_ssb_acquire_db_reader(ssb); tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth); tf_ssb_release_db_reader(ssb, db); /* Ask for every identity we know is being followed from local accounts. */ const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth); if (visible) { int64_t* sequences = NULL; for (int i = 0; visible[i]; i++) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); sequences = tf_resize_vec(sequences, (i + 1) * sizeof(int64_t)); sequences[i] = sequence; } uv_mutex_lock(&work->ebt->mutex); for (int i = 0; visible[i]; i++) { _ebt_add_to_clock(work, visible[i], sequences[i], true, true); } uv_mutex_unlock(&work->ebt->mutex); tf_free(visible); tf_free(sequences); } /* Ask about the incoming connection, too. */ char id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); uv_mutex_lock(&work->ebt->mutex); _ebt_add_to_clock(work, id, sequence, true, true); uv_mutex_unlock(&work->ebt->mutex); } /* Also respond with what we know about all requested identities. */ tf_ssb_ebt_clock_entry_t* requested = NULL; int requested_count = 0; uv_mutex_lock(&work->ebt->mutex); for (int i = 0; i < work->ebt->entries_count; i++) { ebt_entry_t* entry = &work->ebt->entries[i]; if (entry->in_replicate && !entry->out_replicate) { requested = tf_resize_vec(requested, (requested_count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); requested[requested_count] = (tf_ssb_ebt_clock_entry_t) { .value = -1 }; snprintf(requested[requested_count].id, sizeof(requested[requested_count].id), "%s", entry->id); requested_count++; } } uv_mutex_unlock(&work->ebt->mutex); if (requested_count) { for (int i = 0; i < requested_count; i++) { tf_ssb_db_get_latest_message_by_author(ssb, requested[i].id, &requested[i].value, NULL, 0); } uv_mutex_lock(&work->ebt->mutex); for (int i = 0; i < requested_count; i++) { _ebt_add_to_clock(work, requested[i].id, requested[i].value, requested[i].value >= 0, false); } uv_mutex_unlock(&work->ebt->mutex); tf_free(requested); } } static void _tf_ssb_ebt_get_send_clock_after_work(tf_ssb_connection_t* connection, int status, void* user_data) { ebt_get_clock_t* work = user_data; work->callback(work->clock, work->request_number, work->user_data); tf_free(work->clock); tf_free(work); } void tf_ssb_ebt_get_send_clock(tf_ssb_ebt_t* ebt, int32_t request_number, tf_ssb_ebt_clock_callback_t* callback, void* user_data) { ebt_get_clock_t* work = tf_malloc(sizeof(ebt_get_clock_t)); *work = (ebt_get_clock_t) { .ebt = ebt, .request_number = request_number, .callback = callback, .user_data = user_data, }; tf_ssb_connection_run_work(ebt->connection, _tf_ssb_ebt_get_send_clock_work, _tf_ssb_ebt_get_send_clock_after_work, work); } tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt) { int count = 0; tf_ssb_ebt_clock_t* clock = NULL; uv_mutex_lock(&ebt->mutex); for (int i = 0; i < ebt->entries_count; i++) { ebt_entry_t* entry = &ebt->entries[i]; if (entry->in_replicate && entry->in_receive && entry->out > entry->in) { clock = tf_resize_vec(clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); clock->entries[count] = (tf_ssb_ebt_clock_entry_t) { .value = entry->in }; snprintf(clock->entries[count].id, sizeof(clock->entries[count].id), "%s", entry->id); clock->count = ++count; } } uv_mutex_unlock(&ebt->mutex); return clock; } void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence) { uv_mutex_lock(&ebt->mutex); ebt_entry_t* entry = _ebt_get_entry(ebt, id); entry->in = tf_max(entry->in, sequence); if (entry->in == entry->out && (tf_ssb_connection_get_flags(ebt->connection) & k_tf_ssb_connect_flag_one_shot) == 0) { tf_ssb_connection_add_new_message_request(ebt->connection, id, tf_ssb_connection_get_ebt_request_number(ebt->connection), false); } uv_mutex_unlock(&ebt->mutex); }