Add replication to what -t bench measures. Add a bool to control printing RPC messages. Respond to ebt.replicate with messages that weren't requested.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4095 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2022-12-31 18:59:29 +00:00
parent ca6d042ed6
commit a9f6593979
4 changed files with 85 additions and 20 deletions

View File

@ -284,12 +284,16 @@ function formatMessage(row) {
function ebtReplicateRegisterMessageCallback(request) {
ssb.addEventListener('message', function(message_id) {
if (request.connection.send_clock) {
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1',
[message_id],
function (row) {
if (request.connection.send_clock[row.author] < row.sequence) {
request.send_json(formatMessage(row));
}
});
}
});
}
@ -299,14 +303,21 @@ function ebtReplicateCommon(request) {
} else {
ebtReplicateSendClock(request, request.message);
if (!request.connection.send_clock) {
request.connection.send_clock = {};
}
for (let id of Object.keys(request.message)) {
if (request.message[id] >= 0 && (request.message[id] & 1) == 0) {
request.connection.send_clock[id] = request.message[id] >> 1;
ssb.sqlStream(
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
[id, request.message[id] >> 1],
function (row) {
request.send_json(formatMessage(row));
request.connection.send_clock[id] = row.sequence;
});
} else {
delete request.connection.send_clock[id];
}
}
}

View File

@ -157,6 +157,8 @@ typedef struct _tf_ssb_t
uint8_t pub[crypto_sign_PUBLICKEYBYTES];
uint8_t priv[crypto_sign_SECRETKEYBYTES];
bool verbose;
int messages_stored;
int rpc_in;
int rpc_out;
@ -527,7 +529,10 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
uint32_t rn = htonl((uint32_t)request_number);
memcpy(combined + 1 + sizeof(uint32_t), &rn, sizeof(rn));
memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size);
if (connection->ssb->verbose)
{
printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: %.*s\n", connection->name, flags, request_number, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary? 0 : (int)size, message);
}
_tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size);
tf_free(combined);
connection->ssb->rpc_out++;
@ -1155,7 +1160,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
{
char id[k_id_base64_len] = "";
tf_ssb_id_bin_to_str(id, sizeof(id), connection->serverpub);
if (connection->ssb->verbose)
{
printf(CYAN "%s RPC RECV" RESET " from %s flags=%x RN=%d: %.*s\n", connection->name, id, flags, request_number, (int)size, message);
}
JSContext* context = connection->ssb->context;
JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL);
@ -1201,8 +1209,11 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
JS_FreeValue(context, val);
}
else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary)
{
if (connection->ssb->verbose)
{
printf(CYAN "%s RPC RECV" RESET " flags=%x RN=%d: %zd bytes\n", connection->name, flags, request_number, size);
}
tf_ssb_rpc_callback_t* callback = NULL;
void* user_data = NULL;
if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data))

View File

@ -322,10 +322,6 @@ static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, i
{
tf_ssb_notify_message_added(ssb, id);
}
else
{
printf("Failed to store message.\n");
}
}
else
{

View File

@ -75,17 +75,24 @@ static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change,
printf("conns = %d %d %d\n", test->connection_count0, test->connection_count1, test->connection_count2);
}
typedef struct _count_messages_t
{
tf_ssb_t* ssb;
int count;
} count_messages_t;
static void _count_messages_callback(JSValue row, void* user_data)
{
int* count = user_data;
++*count;
count_messages_t* count = user_data;
JSContext* context = tf_ssb_get_context(count->ssb);
JS_ToInt32(context, &count->count, JS_GetPropertyStr(context, row, "count"));
}
static int _ssb_test_count_messages(tf_ssb_t* ssb)
{
int count = 0;
tf_ssb_db_visit_query(ssb, "SELECT * FROM messages", JS_UNDEFINED, _count_messages_callback, &count);
return count;
count_messages_t count = { .ssb = ssb };
tf_ssb_db_visit_query(ssb, "SELECT COUNT(*) AS count FROM messages", JS_UNDEFINED, _count_messages_callback, &count);
return count.count;
}
static void _message_added(tf_ssb_t* ssb, const char* id, void* user_data)
@ -564,15 +571,55 @@ void tf_ssb_test_bench(const tf_test_options_t* options)
struct timespec start_time = { 0 };
struct timespec end_time = { 0 };
clock_gettime(CLOCK_REALTIME, &start_time);
for (int i = 0; i < 10 * 1024; i++)
const int k_messages = 4096;
for (int i = 0; i < k_messages; i++)
{
tf_ssb_append_post(ssb0, "Hello, world!");
}
clock_gettime(CLOCK_REALTIME, &end_time);
printf("delta = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f);
printf("insert = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f);
sqlite3* db1 = NULL;
sqlite3_open(":memory:", &db1);
assert(r == SQLITE_OK);
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1);
tf_ssb_generate_keys(ssb1);
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
uv_idle_t idle0 = { .data = ssb0 };
uv_idle_init(&loop, &idle0);
uv_idle_start(&idle0, _ssb_test_idle);
uv_idle_t idle1 = { .data = ssb1 };
uv_idle_init(&loop, &idle1);
uv_idle_start(&idle1, _ssb_test_idle);
tf_ssb_register(tf_ssb_get_context(ssb0), ssb0);
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
tf_ssb_server_open(ssb0, 12347);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
printf("Waiting for messages.\n");
clock_gettime(CLOCK_REALTIME, &start_time);
while (_ssb_test_count_messages(ssb1) < k_messages)
{
uv_run(&loop, UV_RUN_ONCE);
}
clock_gettime(CLOCK_REALTIME, &end_time);
printf("Done.\n");
printf("replicate = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f);
tf_ssb_send_close(ssb1);
tf_ssb_server_close(ssb0);
uv_close((uv_handle_t*)&idle0, NULL);
uv_close((uv_handle_t*)&idle1, NULL);
uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_destroy(ssb1);
tf_ssb_destroy(ssb0);
uv_loop_close(&loop);