Compare commits
5 Commits
03a28fc3c5
...
9da4857066
Author | SHA1 | Date | |
---|---|---|---|
9da4857066 | |||
75c71135ba | |||
0cb5025a16 | |||
44d9f69434 | |||
3f343b283b |
@ -1,5 +1,5 @@
|
||||
{
|
||||
"type": "tildefriends-app",
|
||||
"emoji": "🦀",
|
||||
"previous": "&ViNc/3DepKybORDYRqBjsXbzX/wzy43nVI6UFy6vNBI=.sha256"
|
||||
"previous": "&bivD/AIKN+ZOBUrq6+Z049v1AJ5PXV4M1Uqot+iHg5M=.sha256"
|
||||
}
|
||||
|
@ -270,35 +270,65 @@ class TfElement extends LitElement {
|
||||
}
|
||||
|
||||
async get_latest_private(following) {
|
||||
const k_version = 1;
|
||||
// { "version": 1, "range": [1234, 5678], messages: [ "%1.sha256", "%2.sha256", ... ], latest: rowid }
|
||||
let cache = JSON.parse(await tfrpc.rpc.databaseGet(`private:${this.whoami}`) ?? '{}');
|
||||
if (cache.version !== k_version) {
|
||||
cache = {
|
||||
version: k_version,
|
||||
messages: [],
|
||||
range: [],
|
||||
};
|
||||
}
|
||||
let latest = (
|
||||
await tfrpc.rpc.query('SELECT MAX(rowid) AS latest FROM messages')
|
||||
)[0].latest;
|
||||
const k_chunk_count = 256;
|
||||
while (latest - k_chunk_count >= 0) {
|
||||
let ranges = [];
|
||||
const k_chunk_size = 512;
|
||||
if (cache.range.length) {
|
||||
for (let i = cache.range[1]; i < latest; i += k_chunk_size) {
|
||||
ranges.push([i, Math.min(i + k_chunk_size, latest), true]);
|
||||
}
|
||||
for (let i = cache.range[0]; i >= 0; i -= k_chunk_size) {
|
||||
ranges.push([Math.max(i - k_chunk_size, 0), Math.min(cache.range[0], i + k_chunk_size), false]);
|
||||
}
|
||||
} else {
|
||||
for (let i = 0; i < latest; i += k_chunk_size) {
|
||||
ranges.push([i, Math.min(i + k_chunk_size, latest), true]);
|
||||
}
|
||||
}
|
||||
console.log(cache);
|
||||
for (let range of ranges) {
|
||||
let messages = await tfrpc.rpc.query(
|
||||
`
|
||||
SELECT messages.rowid, messages.id, previous, author, sequence, timestamp, hash, json(content) AS content, signature
|
||||
SELECT messages.rowid, messages.id, json(content) AS content
|
||||
FROM messages
|
||||
JOIN json_each(?1) AS following ON messages.author = following.value
|
||||
WHERE
|
||||
messages.rowid > ?2 AND
|
||||
messages.rowid <= ?3 AND
|
||||
messages.rowid > ?1 AND
|
||||
messages.rowid <= ?2 AND
|
||||
json(messages.content) LIKE '"%'
|
||||
ORDER BY sequence DESC
|
||||
`,
|
||||
[
|
||||
JSON.stringify(following),
|
||||
latest - k_chunk_count,
|
||||
latest,
|
||||
range[0],
|
||||
range[1],
|
||||
]
|
||||
);
|
||||
messages = (await this.decrypt(messages)).filter((x) => x.decrypted);
|
||||
if (messages.length) {
|
||||
return Math.max(...messages.map((x) => x.rowid));
|
||||
cache.latest = Math.max(cache.latest ?? 0, ...messages.map((x) => x.rowid));
|
||||
if (range[2]) {
|
||||
cache.messages = [...cache.messages, ...messages.map(x => x.id)];
|
||||
} else {
|
||||
cache.messages = [...messages.map(x => x.id), ...cache.messages];
|
||||
}
|
||||
latest -= k_chunk_count;
|
||||
}
|
||||
return -1;
|
||||
cache.range[0] = Math.min(cache.range[0] ?? range[0], range[0]);
|
||||
cache.range[1] = Math.max(cache.range[1] ?? range[1], range[1]);
|
||||
await tfrpc.rpc.databaseSet(`private:${this.whoami}`, JSON.stringify(cache));
|
||||
}
|
||||
console.log(cache);
|
||||
return cache.latest;
|
||||
}
|
||||
|
||||
async load_channels_latest(following) {
|
||||
|
11
src/ssb.c
11
src/ssb.c
@ -4,6 +4,7 @@
|
||||
#include "mem.h"
|
||||
#include "ssb.connections.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.ebt.h"
|
||||
#include "ssb.rpc.h"
|
||||
#include "trace.h"
|
||||
#include "util.js.h"
|
||||
@ -280,6 +281,7 @@ typedef struct _tf_ssb_connection_t
|
||||
tf_ssb_blob_wants_t blob_wants;
|
||||
bool sent_clock;
|
||||
int32_t ebt_request_number;
|
||||
tf_ssb_ebt_t* ebt;
|
||||
|
||||
JSValue object;
|
||||
|
||||
@ -1986,6 +1988,9 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
|
||||
connection->message_requests = NULL;
|
||||
connection->message_requests_count = 0;
|
||||
|
||||
tf_ssb_ebt_destroy(connection->ebt);
|
||||
connection->ebt = NULL;
|
||||
|
||||
for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next)
|
||||
{
|
||||
if (*it == connection)
|
||||
@ -2744,6 +2749,7 @@ static tf_ssb_connection_t* _tf_ssb_connection_create_internal(tf_ssb_t* ssb, co
|
||||
connection->ssb = ssb;
|
||||
connection->send_request_number = 1;
|
||||
randombytes_buf(&connection->prng, sizeof(connection->prng));
|
||||
connection->ebt = tf_ssb_ebt_create(connection);
|
||||
|
||||
connection->async.data = connection;
|
||||
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
|
||||
@ -4341,3 +4347,8 @@ int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection)
|
||||
{
|
||||
return connection->flags;
|
||||
}
|
||||
|
||||
tf_ssb_ebt_t* tf_ssb_connection_get_ebt(tf_ssb_connection_t* connection)
|
||||
{
|
||||
return connection ? connection->ebt : NULL;
|
||||
}
|
||||
|
302
src/ssb.ebt.c
Normal file
302
src/ssb.ebt.c
Normal file
@ -0,0 +1,302 @@
|
||||
#include "ssb.ebt.h"
|
||||
|
||||
#include "mem.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.h"
|
||||
#include "util.js.h"
|
||||
|
||||
#include "uv.h"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
typedef struct _ebt_entry_t
|
||||
{
|
||||
char id[k_id_base64_len];
|
||||
int64_t out;
|
||||
int64_t in;
|
||||
bool out_replicate;
|
||||
bool out_receive;
|
||||
bool in_replicate;
|
||||
bool in_receive;
|
||||
} ebt_entry_t;
|
||||
|
||||
typedef struct _tf_ssb_ebt_t
|
||||
{
|
||||
tf_ssb_connection_t* connection;
|
||||
uv_mutex_t mutex;
|
||||
|
||||
ebt_entry_t* entries;
|
||||
int entries_count;
|
||||
} tf_ssb_ebt_t;
|
||||
|
||||
tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection)
|
||||
{
|
||||
tf_ssb_ebt_t* ebt = tf_malloc(sizeof(tf_ssb_ebt_t));
|
||||
*ebt = (tf_ssb_ebt_t) {
|
||||
.connection = connection,
|
||||
};
|
||||
uv_mutex_init(&ebt->mutex);
|
||||
return ebt;
|
||||
}
|
||||
|
||||
void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt)
|
||||
{
|
||||
uv_mutex_destroy(&ebt->mutex);
|
||||
tf_free(ebt->entries);
|
||||
tf_free(ebt);
|
||||
}
|
||||
|
||||
static int _ebt_entry_compare(const void* a, const void* b)
|
||||
{
|
||||
const char* id = a;
|
||||
const ebt_entry_t* entry = b;
|
||||
return strcmp(id, entry->id);
|
||||
}
|
||||
|
||||
static ebt_entry_t* _ebt_get_entry(tf_ssb_ebt_t* ebt, const char* id)
|
||||
{
|
||||
int index = tf_util_insert_index(id, ebt->entries, ebt->entries_count, sizeof(ebt_entry_t), _ebt_entry_compare);
|
||||
if (index < ebt->entries_count && strcmp(id, ebt->entries[index].id) == 0)
|
||||
{
|
||||
return &ebt->entries[index];
|
||||
}
|
||||
else
|
||||
{
|
||||
ebt->entries = tf_resize_vec(ebt->entries, (ebt->entries_count + 1) * sizeof(ebt_entry_t));
|
||||
if (index < ebt->entries_count)
|
||||
{
|
||||
memmove(ebt->entries + index + 1, ebt->entries + index, (ebt->entries_count - index) * sizeof(ebt_entry_t));
|
||||
}
|
||||
ebt->entries[index] = (ebt_entry_t) {
|
||||
.in = -1,
|
||||
.out = -1,
|
||||
};
|
||||
snprintf(ebt->entries[index].id, sizeof(ebt->entries[index].id), "%s", id);
|
||||
ebt->entries_count++;
|
||||
return &ebt->entries[index];
|
||||
}
|
||||
}
|
||||
|
||||
void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clock)
|
||||
{
|
||||
JSPropertyEnum* ptab = NULL;
|
||||
uint32_t plen = 0;
|
||||
if (JS_GetOwnPropertyNames(context, &ptab, &plen, clock, JS_GPN_STRING_MASK) == 0)
|
||||
{
|
||||
uv_mutex_lock(&ebt->mutex);
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
JSValue in_clock = JS_UNDEFINED;
|
||||
JSPropertyDescriptor desc = { 0 };
|
||||
if (JS_GetOwnProperty(context, &desc, clock, ptab[i].atom) == 1)
|
||||
{
|
||||
in_clock = desc.value;
|
||||
JS_FreeValue(context, desc.setter);
|
||||
JS_FreeValue(context, desc.getter);
|
||||
}
|
||||
if (!JS_IsUndefined(in_clock))
|
||||
{
|
||||
JSValue key = JS_AtomToString(context, ptab[i].atom);
|
||||
const char* author = JS_ToCString(context, key);
|
||||
int64_t sequence = -1;
|
||||
JS_ToInt64(context, &sequence, in_clock);
|
||||
|
||||
ebt_entry_t* entry = _ebt_get_entry(ebt, author);
|
||||
if (sequence < 0)
|
||||
{
|
||||
entry->in = -1;
|
||||
entry->in_replicate = false;
|
||||
entry->in_receive = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
entry->in = sequence >> 1;
|
||||
entry->in_replicate = true;
|
||||
entry->in_receive = (sequence & 1) == 0;
|
||||
}
|
||||
JS_FreeCString(context, author);
|
||||
JS_FreeValue(context, key);
|
||||
}
|
||||
JS_FreeValue(context, in_clock);
|
||||
}
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
JS_FreeAtom(context, ptab[i].atom);
|
||||
}
|
||||
js_free(context, ptab);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct _ebt_get_clock_t
|
||||
{
|
||||
tf_ssb_ebt_t* ebt;
|
||||
int32_t request_number;
|
||||
tf_ssb_ebt_clock_callback_t* callback;
|
||||
tf_ssb_ebt_clock_t* clock;
|
||||
void* user_data;
|
||||
} ebt_get_clock_t;
|
||||
|
||||
static int _ebt_compare_entry(const void* a, const void* b)
|
||||
{
|
||||
const char* id = a;
|
||||
const tf_ssb_ebt_clock_entry_t* entry = b;
|
||||
return strcmp(id, entry->id);
|
||||
}
|
||||
|
||||
static void _ebt_add_to_clock(ebt_get_clock_t* work, const char* id, int64_t value, bool replicate, bool receive)
|
||||
{
|
||||
int count = work->clock ? work->clock->count : 0;
|
||||
ebt_entry_t* entry = _ebt_get_entry(work->ebt, id);
|
||||
if ((replicate && !entry->out_replicate) || (receive && !entry->out_receive) || ((replicate || receive || entry->out_replicate || entry->out_receive) && entry->out != value))
|
||||
{
|
||||
entry->out = value;
|
||||
entry->out_replicate = entry->out_replicate || replicate;
|
||||
entry->out_receive = entry->out_receive || receive;
|
||||
|
||||
int index = tf_util_insert_index(id, count ? work->clock->entries : NULL, count, sizeof(tf_ssb_ebt_clock_entry_t), _ebt_compare_entry);
|
||||
int64_t out_value = entry->out_replicate ? ((value << 1) | (entry->out_receive ? 0 : 1)) : -1;
|
||||
if (index < count && strcmp(id, work->clock->entries[index].id) == 0)
|
||||
{
|
||||
work->clock->entries[index].value = out_value;
|
||||
}
|
||||
else
|
||||
{
|
||||
work->clock = tf_resize_vec(work->clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
if (index < count)
|
||||
{
|
||||
memmove(work->clock->entries + index + 1, work->clock->entries + index, (count - index) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
}
|
||||
work->clock->entries[index] = (tf_ssb_ebt_clock_entry_t) { .value = out_value };
|
||||
snprintf(work->clock->entries[index].id, sizeof(work->clock->entries[index].id), "%s", id);
|
||||
work->clock->count = count + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_ebt_get_send_clock_work(tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
ebt_get_clock_t* work = user_data;
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->ebt->connection);
|
||||
|
||||
int64_t depth = 2;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth);
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
|
||||
/* Ask for every identity we know is being followed from local accounts. */
|
||||
const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth);
|
||||
if (visible)
|
||||
{
|
||||
int64_t* sequences = NULL;
|
||||
for (int i = 0; visible[i]; i++)
|
||||
{
|
||||
int64_t sequence = 0;
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0);
|
||||
sequences = tf_resize_vec(sequences, (i + 1) * sizeof(int64_t));
|
||||
sequences[i] = sequence;
|
||||
}
|
||||
|
||||
uv_mutex_lock(&work->ebt->mutex);
|
||||
for (int i = 0; visible[i]; i++)
|
||||
{
|
||||
_ebt_add_to_clock(work, visible[i], sequences[i], true, true);
|
||||
}
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
|
||||
tf_free(visible);
|
||||
tf_free(sequences);
|
||||
}
|
||||
|
||||
/* Ask about the incoming connection, too. */
|
||||
char id[k_id_base64_len] = "";
|
||||
if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
|
||||
{
|
||||
int64_t sequence = 0;
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0);
|
||||
uv_mutex_lock(&work->ebt->mutex);
|
||||
_ebt_add_to_clock(work, id, sequence, true, true);
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
}
|
||||
|
||||
/* Also respond with what we know about all requested identities. */
|
||||
tf_ssb_ebt_clock_entry_t* requested = NULL;
|
||||
int requested_count = 0;
|
||||
uv_mutex_lock(&work->ebt->mutex);
|
||||
for (int i = 0; i < work->ebt->entries_count; i++)
|
||||
{
|
||||
ebt_entry_t* entry = &work->ebt->entries[i];
|
||||
if (entry->in_replicate && !entry->out_replicate)
|
||||
{
|
||||
requested = tf_resize_vec(requested, (requested_count + 1) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
requested[requested_count] = (tf_ssb_ebt_clock_entry_t) { .value = -1 };
|
||||
snprintf(requested[requested_count].id, sizeof(requested[requested_count].id), "%s", entry->id);
|
||||
requested_count++;
|
||||
}
|
||||
}
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
|
||||
if (requested_count)
|
||||
{
|
||||
for (int i = 0; i < requested_count; i++)
|
||||
{
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, requested[i].id, &requested[i].value, NULL, 0);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&work->ebt->mutex);
|
||||
for (int i = 0; i < requested_count; i++)
|
||||
{
|
||||
_ebt_add_to_clock(work, requested[i].id, requested[i].value, requested[i].value >= 0, false);
|
||||
}
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
tf_free(requested);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_ebt_get_send_clock_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
|
||||
{
|
||||
ebt_get_clock_t* work = user_data;
|
||||
work->callback(work->clock, work->request_number, work->user_data);
|
||||
tf_free(work->clock);
|
||||
tf_free(work);
|
||||
}
|
||||
|
||||
void tf_ssb_ebt_get_send_clock(tf_ssb_ebt_t* ebt, int32_t request_number, tf_ssb_ebt_clock_callback_t* callback, void* user_data)
|
||||
{
|
||||
ebt_get_clock_t* work = tf_malloc(sizeof(ebt_get_clock_t));
|
||||
*work = (ebt_get_clock_t) {
|
||||
.ebt = ebt,
|
||||
.request_number = request_number,
|
||||
.callback = callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
tf_ssb_connection_run_work(ebt->connection, _tf_ssb_ebt_get_send_clock_work, _tf_ssb_ebt_get_send_clock_after_work, work);
|
||||
}
|
||||
|
||||
tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt)
|
||||
{
|
||||
int count = 0;
|
||||
tf_ssb_ebt_clock_t* clock = NULL;
|
||||
uv_mutex_lock(&ebt->mutex);
|
||||
for (int i = 0; i < ebt->entries_count; i++)
|
||||
{
|
||||
ebt_entry_t* entry = &ebt->entries[i];
|
||||
if (entry->in_replicate && entry->in_receive && entry->out > entry->in)
|
||||
{
|
||||
clock = tf_resize_vec(clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
clock->entries[count] = (tf_ssb_ebt_clock_entry_t) { .value = entry->in };
|
||||
snprintf(clock->entries[count].id, sizeof(clock->entries[count].id), "%s", entry->id);
|
||||
clock->count = ++count;
|
||||
}
|
||||
}
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
return clock;
|
||||
}
|
||||
|
||||
void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence)
|
||||
{
|
||||
uv_mutex_lock(&ebt->mutex);
|
||||
ebt_entry_t* entry = _ebt_get_entry(ebt, id);
|
||||
entry->in = tf_max(entry->in, sequence);
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
}
|
85
src/ssb.ebt.h
Normal file
85
src/ssb.ebt.h
Normal file
@ -0,0 +1,85 @@
|
||||
#pragma once
|
||||
|
||||
#include "ssb.h"
|
||||
|
||||
#include "quickjs.h"
|
||||
|
||||
typedef struct _tf_ssb_connection_t tf_ssb_connection_t;
|
||||
|
||||
/**
|
||||
** SSB EBT state.
|
||||
*/
|
||||
typedef struct _tf_ssb_ebt_t tf_ssb_ebt_t;
|
||||
|
||||
/**
|
||||
** An EBT clock entry (identity + sequence pair).
|
||||
*/
|
||||
typedef struct _tf_ssb_ebt_clock_entry_t
|
||||
{
|
||||
/** The identity. */
|
||||
char id[k_id_base64_len];
|
||||
/** The sequence number. */
|
||||
int64_t value;
|
||||
} tf_ssb_ebt_clock_entry_t;
|
||||
|
||||
/**
|
||||
** A set of IDs and sequence values.
|
||||
*/
|
||||
typedef struct _tf_ssb_ebt_clock_t
|
||||
{
|
||||
/** Number of entries. */
|
||||
int count;
|
||||
/** Clock entries. */
|
||||
tf_ssb_ebt_clock_entry_t entries[];
|
||||
} tf_ssb_ebt_clock_t;
|
||||
|
||||
/**
|
||||
** A callback with EBT clock state.
|
||||
*/
|
||||
typedef void(tf_ssb_ebt_clock_callback_t)(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data);
|
||||
|
||||
/**
|
||||
** Create an EBT instance.
|
||||
** @param connection The SSB connection to which this EBT state applies.
|
||||
** @return The EBT instance.
|
||||
*/
|
||||
tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection);
|
||||
|
||||
/**
|
||||
** Update the EBT state with a received clock.
|
||||
** @param ebt The EBT instance.
|
||||
** @param context The JS context.
|
||||
** @param clock The received clock.
|
||||
*/
|
||||
void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clock);
|
||||
|
||||
/**
|
||||
** Get the EBT clock state to send.
|
||||
** @param ebt The EBT instance.
|
||||
** @param request_number The request number for which the clock will be sent.
|
||||
** @param callback Called with the clock when determined.
|
||||
** @param user_data User data passed to the callback.
|
||||
*/
|
||||
void tf_ssb_ebt_get_send_clock(tf_ssb_ebt_t* ebt, int32_t request_number, tf_ssb_ebt_clock_callback_t* callback, void* user_data);
|
||||
|
||||
/**
|
||||
** Get the set of messages requested to be sent.
|
||||
** @param ebt The EBT instance.
|
||||
** @return A clock of identities and sequence numbers indicating which messages
|
||||
** are due to be sent. The caller must free with tf_free().
|
||||
*/
|
||||
tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt);
|
||||
|
||||
/**
|
||||
** Update the clock state indicating the messages that have been sent for an account.
|
||||
** @param ebt The EBT instance.
|
||||
** @param id The identity to update.
|
||||
** @param sequence The maximum sequence number sent.
|
||||
*/
|
||||
void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence);
|
||||
|
||||
/**
|
||||
** Destroy an EBT instance.
|
||||
** @param ebt The EBT instance.
|
||||
*/
|
||||
void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt);
|
@ -77,6 +77,8 @@ typedef enum _tf_ssb_connect_flags_t
|
||||
typedef struct _tf_ssb_t tf_ssb_t;
|
||||
/** An SSB connection. */
|
||||
typedef struct _tf_ssb_connection_t tf_ssb_connection_t;
|
||||
/** A connection's EBT state. */
|
||||
typedef struct _tf_ssb_ebt_t tf_ssb_ebt_t;
|
||||
/** A trace instance. */
|
||||
typedef struct _tf_trace_t tf_trace_t;
|
||||
/** An SQLite database handle. */
|
||||
@ -1132,4 +1134,11 @@ void tf_ssb_sync_start(tf_ssb_t* ssb);
|
||||
*/
|
||||
int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection);
|
||||
|
||||
/**
|
||||
** Get a connection's EBT state.
|
||||
** @param connection The connection.
|
||||
** @return the EBT state for the connection.
|
||||
*/
|
||||
tf_ssb_ebt_t* tf_ssb_connection_get_ebt(tf_ssb_connection_t* connection);
|
||||
|
||||
/** @} */
|
||||
|
260
src/ssb.rpc.c
260
src/ssb.rpc.c
@ -3,6 +3,7 @@
|
||||
#include "log.h"
|
||||
#include "mem.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.ebt.h"
|
||||
#include "ssb.h"
|
||||
#include "util.js.h"
|
||||
|
||||
@ -20,30 +21,6 @@ static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection);
|
||||
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
|
||||
static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms);
|
||||
|
||||
static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value)
|
||||
{
|
||||
bool result = default_value;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_step(statement) == SQLITE_ROW)
|
||||
{
|
||||
result = sqlite3_column_int(statement, 0) != 0;
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
return result;
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_gossip_ping_callback(
|
||||
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
@ -879,6 +856,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
|
||||
break;
|
||||
}
|
||||
}
|
||||
tf_ssb_ebt_set_messages_sent(tf_ssb_connection_get_ebt(connection), request->author, request->out_max_sequence_seen);
|
||||
if (!request->out_finished)
|
||||
{
|
||||
_tf_ssb_connection_send_history_stream(
|
||||
@ -967,197 +945,24 @@ static void _tf_ssb_rpc_createHistoryStream(
|
||||
JS_FreeValue(context, arg_array);
|
||||
}
|
||||
|
||||
typedef struct _ebt_clock_row_t
|
||||
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection)
|
||||
{
|
||||
char id[k_id_base64_len];
|
||||
int64_t value;
|
||||
} ebt_clock_row_t;
|
||||
|
||||
typedef struct _ebt_replicate_send_clock_t
|
||||
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
|
||||
tf_ssb_ebt_clock_t* clock = tf_ssb_ebt_get_messages_to_send(ebt);
|
||||
if (clock)
|
||||
{
|
||||
int64_t request_number;
|
||||
ebt_clock_row_t* clock;
|
||||
int clock_count;
|
||||
|
||||
char* out_clock;
|
||||
} ebt_replicate_send_clock_t;
|
||||
|
||||
static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
ebt_replicate_send_clock_t* work = user_data;
|
||||
|
||||
JSMallocFunctions funcs = { 0 };
|
||||
tf_get_js_malloc_functions(&funcs);
|
||||
JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
|
||||
JSContext* context = JS_NewContext(runtime);
|
||||
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSValue full_clock = JS_NewObject(context);
|
||||
|
||||
int64_t depth = 2;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth);
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
|
||||
/* Ask for every identity we know is being followed from local accounts. */
|
||||
const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth);
|
||||
for (int i = 0; visible[i]; i++)
|
||||
{
|
||||
int64_t sequence = 0;
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0);
|
||||
JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
|
||||
}
|
||||
tf_free(visible);
|
||||
|
||||
/* Ask about the incoming connection, too. */
|
||||
char id[k_id_base64_len] = "";
|
||||
if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
|
||||
{
|
||||
JSValue in_clock = JS_GetPropertyStr(context, full_clock, id);
|
||||
if (JS_IsUndefined(in_clock))
|
||||
{
|
||||
int64_t sequence = 0;
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0);
|
||||
JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
|
||||
}
|
||||
JS_FreeValue(context, in_clock);
|
||||
}
|
||||
|
||||
/* Also respond with what we know about all requested identities. */
|
||||
for (int i = 0; i < work->clock_count; i++)
|
||||
{
|
||||
JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id);
|
||||
if (JS_IsUndefined(in_clock))
|
||||
{
|
||||
int64_t sequence = -1;
|
||||
tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0);
|
||||
JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
|
||||
}
|
||||
JS_FreeValue(context, in_clock);
|
||||
}
|
||||
|
||||
JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL);
|
||||
size_t size = 0;
|
||||
const char* string = JS_ToCStringLen(context, &size, json);
|
||||
char* copy = tf_malloc(size + 1);
|
||||
memcpy(copy, string, size + 1);
|
||||
work->out_clock = copy;
|
||||
JS_FreeCString(context, string);
|
||||
JS_FreeValue(context, json);
|
||||
JS_FreeValue(context, full_clock);
|
||||
|
||||
JS_FreeContext(context);
|
||||
JS_FreeRuntime(runtime);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
|
||||
{
|
||||
ebt_replicate_send_clock_t* work = user_data;
|
||||
tf_free(work->clock);
|
||||
if (work->out_clock)
|
||||
{
|
||||
tf_ssb_connection_rpc_send(
|
||||
connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
|
||||
tf_free(work->out_clock);
|
||||
}
|
||||
tf_free(work);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message)
|
||||
{
|
||||
ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t));
|
||||
*work = (ebt_replicate_send_clock_t) {
|
||||
.request_number = request_number,
|
||||
};
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
|
||||
if (!JS_IsUndefined(message))
|
||||
{
|
||||
JSPropertyEnum* ptab = NULL;
|
||||
uint32_t plen = 0;
|
||||
if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
|
||||
{
|
||||
work->clock_count = (int)plen;
|
||||
work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen);
|
||||
memset(work->clock, 0, sizeof(ebt_clock_row_t) * plen);
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
const char* id = JS_AtomToCString(context, ptab[i].atom);
|
||||
snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id);
|
||||
JS_FreeCString(context, id);
|
||||
|
||||
JSPropertyDescriptor desc = { 0 };
|
||||
JSValue key_value = JS_UNDEFINED;
|
||||
if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
|
||||
{
|
||||
key_value = desc.value;
|
||||
JS_FreeValue(context, desc.setter);
|
||||
JS_FreeValue(context, desc.getter);
|
||||
}
|
||||
JS_ToInt64(context, &work->clock[i].value, key_value);
|
||||
JS_FreeValue(context, key_value);
|
||||
JS_FreeAtom(context, ptab[i].atom);
|
||||
}
|
||||
js_free(context, ptab);
|
||||
}
|
||||
}
|
||||
|
||||
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)
|
||||
{
|
||||
if (JS_IsUndefined(message))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
JSPropertyEnum* ptab = NULL;
|
||||
uint32_t plen = 0;
|
||||
if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
|
||||
{
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
JSValue in_clock = JS_UNDEFINED;
|
||||
JSPropertyDescriptor desc = { 0 };
|
||||
if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
|
||||
{
|
||||
in_clock = desc.value;
|
||||
JS_FreeValue(context, desc.setter);
|
||||
JS_FreeValue(context, desc.getter);
|
||||
}
|
||||
if (!JS_IsUndefined(in_clock))
|
||||
{
|
||||
JSValue key = JS_AtomToString(context, ptab[i].atom);
|
||||
int64_t sequence = -1;
|
||||
JS_ToInt64(context, &sequence, in_clock);
|
||||
const char* author = JS_ToCString(context, key);
|
||||
if (sequence >= 0 && (sequence & 1) == 0)
|
||||
for (int i = 0; i < clock->count; i++)
|
||||
{
|
||||
tf_ssb_ebt_clock_entry_t* entry = &clock->entries[i];
|
||||
int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
|
||||
bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
|
||||
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live, false);
|
||||
_tf_ssb_connection_send_history_stream(connection, request_number, entry->id, entry->value, false, live, false);
|
||||
if (live)
|
||||
{
|
||||
tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
|
||||
tf_ssb_connection_add_new_message_request(connection, entry->id, request_number, false);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_ssb_connection_remove_new_message_request(connection, author);
|
||||
}
|
||||
JS_FreeCString(context, author);
|
||||
JS_FreeValue(context, key);
|
||||
}
|
||||
JS_FreeValue(context, in_clock);
|
||||
}
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
JS_FreeAtom(context, ptab[i].atom);
|
||||
}
|
||||
js_free(context, ptab);
|
||||
tf_free(clock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1167,6 +972,25 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi
|
||||
tf_ssb_connection_adjust_read_backpressure(connection, -1);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data)
|
||||
{
|
||||
tf_ssb_connection_t* connection = user_data;
|
||||
|
||||
if (clock && clock->count)
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
JSValue message = JS_NewObject(context);
|
||||
for (int i = 0; i < clock->count; i++)
|
||||
{
|
||||
JS_SetPropertyStr(context, message, clock->entries[i].id, JS_NewInt64(context, clock->entries[i].value));
|
||||
}
|
||||
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
|
||||
_tf_ssb_rpc_ebt_replicate_send_messages(connection);
|
||||
}
|
||||
|
||||
typedef struct _resend_clock_t
|
||||
{
|
||||
tf_ssb_connection_t* connection;
|
||||
@ -1178,8 +1002,8 @@ static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connecti
|
||||
resend_clock_t* resend = user_data;
|
||||
if (!skip)
|
||||
{
|
||||
_tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED);
|
||||
tf_ssb_connection_set_sent_clock(resend->connection, true);
|
||||
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
|
||||
tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection);
|
||||
}
|
||||
tf_free(user_data);
|
||||
}
|
||||
@ -1210,9 +1034,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
|
||||
tf_ssb_connection_adjust_read_backpressure(connection, 1);
|
||||
tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
|
||||
|
||||
if (tf_ssb_connection_get_sent_clock(connection) && !tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection))
|
||||
if (!tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection))
|
||||
{
|
||||
tf_ssb_connection_set_sent_clock(connection, false);
|
||||
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
|
||||
*resend = (resend_clock_t) {
|
||||
.connection = connection,
|
||||
@ -1223,13 +1046,9 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
|
||||
}
|
||||
else
|
||||
{
|
||||
/* EBT clock. */
|
||||
if (!tf_ssb_connection_get_sent_clock(connection))
|
||||
{
|
||||
_tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock);
|
||||
tf_ssb_connection_set_sent_clock(connection, true);
|
||||
}
|
||||
_tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock);
|
||||
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
|
||||
tf_ssb_ebt_receive_clock(ebt, context, in_clock);
|
||||
tf_ssb_ebt_get_send_clock(ebt, request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection);
|
||||
}
|
||||
JS_FreeValue(context, name);
|
||||
JS_FreeValue(context, author);
|
||||
@ -1469,13 +1288,16 @@ static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)
|
||||
static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data)
|
||||
{
|
||||
delete_t* delete = user_data;
|
||||
if (!_get_global_setting_bool(ssb, "delete_stale_feeds", false))
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
bool delete_stale_feeds = false;
|
||||
tf_ssb_db_get_global_setting_bool(db, "delete_stale_feeds", &delete_stale_feeds);
|
||||
if (!delete_stale_feeds)
|
||||
{
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
return;
|
||||
}
|
||||
int64_t start_ns = uv_hrtime();
|
||||
int64_t replication_hops = 2;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
tf_ssb_db_get_global_setting_int64(db, "replication_hops", &replication_hops);
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
const char** identities = tf_ssb_db_get_all_visible_identities(ssb, replication_hops);
|
||||
|
@ -356,7 +356,7 @@ static JSValue _util_defaultGlobalSettings(JSContext* context, JSValueConst this
|
||||
.default_value = JS_NewInt32(context, 2) },
|
||||
{ .name = "delete_stale_feeds",
|
||||
.type = "boolean",
|
||||
.description = "Periodically delete feeds that visible from local accounts and related follows.",
|
||||
.description = "Periodically delete feeds that aren't visible from local accounts or related follows.",
|
||||
.default_value = JS_FALSE },
|
||||
};
|
||||
|
||||
|
@ -150,6 +150,19 @@ const char* tf_util_function_to_string(void* function);
|
||||
_a > _b ? _b : _a; \
|
||||
})
|
||||
|
||||
/**
|
||||
** Get the maximum of two values.
|
||||
** @param a The first value.
|
||||
** @param b The second value.
|
||||
** @return The maximum of a and b.
|
||||
*/
|
||||
#define tf_max(a, b) \
|
||||
({ \
|
||||
__typeof__(a) _a = (a); \
|
||||
__typeof__(b) _b = (b); \
|
||||
_a > _b ? _a : _b; \
|
||||
})
|
||||
|
||||
/**
|
||||
** Get the number of elements in an array.
|
||||
** @param a The array.
|
||||
|
Loading…
x
Reference in New Issue
Block a user