ssb: Attempt to show EBT replication progress in the connections tab, mainly to help track down bugs, which there are.
Some checks failed
Build Tilde Friends / Build-All (push) Has been cancelled
Some checks failed
Build Tilde Friends / Build-All (push) Has been cancelled
This commit is contained in:
@ -30,6 +30,9 @@ typedef struct _tf_ssb_ebt_t
|
||||
int entries_count;
|
||||
|
||||
int send_clock_pending;
|
||||
|
||||
int max_in;
|
||||
int max_out;
|
||||
} tf_ssb_ebt_t;
|
||||
|
||||
tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection)
|
||||
@ -56,6 +59,25 @@ static int _ebt_entry_compare(const void* a, const void* b)
|
||||
return strcmp(id, entry->id);
|
||||
}
|
||||
|
||||
static void _ebt_count_messages(tf_ssb_ebt_t* ebt, int* in, int* out)
|
||||
{
|
||||
for (int i = 0; i < ebt->entries_count; i++)
|
||||
{
|
||||
ebt_entry_t* entry = &ebt->entries[i];
|
||||
if (entry->in >= 0 && entry->out >= 0)
|
||||
{
|
||||
if (entry->in > entry->out)
|
||||
{
|
||||
*in += entry->in - entry->out;
|
||||
}
|
||||
else if (entry->out > entry->in)
|
||||
{
|
||||
*out += entry->out - entry->in;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ebt_entry_t* _ebt_get_entry(tf_ssb_ebt_t* ebt, const char* id)
|
||||
{
|
||||
uint8_t bin[k_id_bin_len];
|
||||
@ -135,6 +157,13 @@ void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clo
|
||||
}
|
||||
JS_FreeValue(context, in_clock);
|
||||
}
|
||||
|
||||
int in = 0;
|
||||
int out = 0;
|
||||
_ebt_count_messages(ebt, &in, &out);
|
||||
ebt->max_in = tf_max(in, ebt->max_in);
|
||||
ebt->max_out = tf_max(out, ebt->max_out);
|
||||
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
for (uint32_t i = 0; i < plen; ++i)
|
||||
{
|
||||
@ -164,29 +193,32 @@ static void _ebt_add_to_clock(ebt_get_clock_t* work, const char* id, int64_t val
|
||||
{
|
||||
int count = work->clock ? work->clock->count : 0;
|
||||
ebt_entry_t* entry = _ebt_get_entry(work->ebt, id);
|
||||
if (entry &&
|
||||
((replicate && !entry->out_replicate) || (receive && !entry->out_receive) || ((replicate || receive || entry->out_replicate || entry->out_receive) && entry->out != value)))
|
||||
if (entry)
|
||||
{
|
||||
entry->out = value;
|
||||
entry->out_replicate = entry->out_replicate || replicate;
|
||||
entry->out_receive = entry->out_receive || receive;
|
||||
|
||||
int index = tf_util_insert_index(id, count ? work->clock->entries : NULL, count, sizeof(tf_ssb_ebt_clock_entry_t), _ebt_compare_entry);
|
||||
int64_t out_value = entry->out_replicate ? ((value << 1) | (entry->out_receive ? 0 : 1)) : -1;
|
||||
if (index < count && strcmp(id, work->clock->entries[index].id) == 0)
|
||||
if ((replicate && !entry->out_replicate) || (receive && !entry->out_receive) ||
|
||||
((replicate || receive || entry->out_replicate || entry->out_receive) && entry->out != value))
|
||||
{
|
||||
work->clock->entries[index].value = out_value;
|
||||
}
|
||||
else
|
||||
{
|
||||
work->clock = tf_resize_vec(work->clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
if (index < count)
|
||||
int index = tf_util_insert_index(id, count ? work->clock->entries : NULL, count, sizeof(tf_ssb_ebt_clock_entry_t), _ebt_compare_entry);
|
||||
int64_t out_value = entry->out_replicate ? ((value << 1) | (entry->out_receive ? 0 : 1)) : -1;
|
||||
if (index < count && strcmp(id, work->clock->entries[index].id) == 0)
|
||||
{
|
||||
memmove(work->clock->entries + index + 1, work->clock->entries + index, (count - index) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
work->clock->entries[index].value = out_value;
|
||||
}
|
||||
else
|
||||
{
|
||||
work->clock = tf_resize_vec(work->clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
if (index < count)
|
||||
{
|
||||
memmove(work->clock->entries + index + 1, work->clock->entries + index, (count - index) * sizeof(tf_ssb_ebt_clock_entry_t));
|
||||
}
|
||||
work->clock->entries[index] = (tf_ssb_ebt_clock_entry_t) { .value = out_value };
|
||||
snprintf(work->clock->entries[index].id, sizeof(work->clock->entries[index].id), "%s", id);
|
||||
work->clock->count = count + 1;
|
||||
}
|
||||
work->clock->entries[index] = (tf_ssb_ebt_clock_entry_t) { .value = out_value };
|
||||
snprintf(work->clock->entries[index].id, sizeof(work->clock->entries[index].id), "%s", id);
|
||||
work->clock->count = count + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -268,6 +300,14 @@ static void _tf_ssb_ebt_get_send_clock_work(tf_ssb_connection_t* connection, voi
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
tf_free(requested);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&work->ebt->mutex);
|
||||
int in = 0;
|
||||
int out = 0;
|
||||
_ebt_count_messages(work->ebt, &in, &out);
|
||||
work->ebt->max_in = tf_max(in, work->ebt->max_in);
|
||||
work->ebt->max_out = tf_max(out, work->ebt->max_out);
|
||||
uv_mutex_unlock(&work->ebt->mutex);
|
||||
}
|
||||
|
||||
static void _tf_ssb_ebt_get_send_clock_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
|
||||
@ -356,3 +396,12 @@ void tf_ssb_ebt_debug_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue debug
|
||||
}
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
}
|
||||
|
||||
void tf_ssb_ebt_get_progress(tf_ssb_ebt_t* ebt, int* in_pending, int* in_total, int* out_pending, int* out_total)
|
||||
{
|
||||
uv_mutex_lock(&ebt->mutex);
|
||||
_ebt_count_messages(ebt, in_pending, out_pending);
|
||||
*in_total = ebt->max_in;
|
||||
*out_total = ebt->max_out;
|
||||
uv_mutex_unlock(&ebt->mutex);
|
||||
}
|
||||
|
Reference in New Issue
Block a user