From a9f6593979efe43477fd1f88b0432078b9fecfc3 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Sat, 31 Dec 2022 18:59:29 +0000 Subject: [PATCH] Add replication to what -t bench measures. Add a bool to control printing RPC messages. Respond to ebt.replicate with messages that weren't requested. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4095 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- core/ssb.js | 23 ++++++++++++++----- src/ssb.c | 17 +++++++++++--- src/ssb.js.c | 4 ---- src/ssb.tests.c | 61 +++++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 85 insertions(+), 20 deletions(-) diff --git a/core/ssb.js b/core/ssb.js index 7e70f717..00dbcc74 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -284,12 +284,16 @@ function formatMessage(row) { function ebtReplicateRegisterMessageCallback(request) { ssb.addEventListener('message', function(message_id) { - ssb.sqlStream( - 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', - [message_id], - function (row) { - request.send_json(formatMessage(row)); - }); + if (request.connection.send_clock) { + ssb.sqlStream( + 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1', + [message_id], + function (row) { + if (request.connection.send_clock[row.author] < row.sequence) { + request.send_json(formatMessage(row)); + } + }); + } }); } @@ -299,14 +303,21 @@ function ebtReplicateCommon(request) { } else { ebtReplicateSendClock(request, request.message); + if (!request.connection.send_clock) { + request.connection.send_clock = {}; + } for (let id of Object.keys(request.message)) { if (request.message[id] >= 0 && (request.message[id] & 1) == 0) { + request.connection.send_clock[id] = request.message[id] >> 1; ssb.sqlStream( 'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence', [id, request.message[id] >> 1], function (row) { request.send_json(formatMessage(row)); + request.connection.send_clock[id] = row.sequence; }); + } else { + delete request.connection.send_clock[id]; } } } diff --git a/src/ssb.c b/src/ssb.c index e264c1d7..928b1b14 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -157,6 +157,8 @@ typedef struct _tf_ssb_t uint8_t pub[crypto_sign_PUBLICKEYBYTES]; uint8_t priv[crypto_sign_SECRETKEYBYTES]; + bool verbose; + int messages_stored; int rpc_in; int rpc_out; @@ -527,7 +529,10 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, uint32_t rn = htonl((uint32_t)request_number); memcpy(combined + 1 + sizeof(uint32_t), &rn, sizeof(rn)); memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size); - printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: %.*s\n", connection->name, flags, request_number, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary? 0 : (int)size, message); + if (connection->ssb->verbose) + { + printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: %.*s\n", connection->name, flags, request_number, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary? 0 : (int)size, message); + } _tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size); tf_free(combined); connection->ssb->rpc_out++; @@ -1155,7 +1160,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t { char id[k_id_base64_len] = ""; tf_ssb_id_bin_to_str(id, sizeof(id), connection->serverpub); - printf(CYAN "%s RPC RECV" RESET " from %s flags=%x RN=%d: %.*s\n", connection->name, id, flags, request_number, (int)size, message); + if (connection->ssb->verbose) + { + printf(CYAN "%s RPC RECV" RESET " from %s flags=%x RN=%d: %.*s\n", connection->name, id, flags, request_number, (int)size, message); + } JSContext* context = connection->ssb->context; JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL); @@ -1202,7 +1210,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary) { - printf(CYAN "%s RPC RECV" RESET " flags=%x RN=%d: %zd bytes\n", connection->name, flags, request_number, size); + if (connection->ssb->verbose) + { + printf(CYAN "%s RPC RECV" RESET " flags=%x RN=%d: %zd bytes\n", connection->name, flags, request_number, size); + } tf_ssb_rpc_callback_t* callback = NULL; void* user_data = NULL; if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) diff --git a/src/ssb.js.c b/src/ssb.js.c index 6cfcdd58..5f1d0882 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -322,10 +322,6 @@ static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, i { tf_ssb_notify_message_added(ssb, id); } - else - { - printf("Failed to store message.\n"); - } } else { diff --git a/src/ssb.tests.c b/src/ssb.tests.c index cbbda0a8..554acd98 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -75,17 +75,24 @@ static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, printf("conns = %d %d %d\n", test->connection_count0, test->connection_count1, test->connection_count2); } +typedef struct _count_messages_t +{ + tf_ssb_t* ssb; + int count; +} count_messages_t; + static void _count_messages_callback(JSValue row, void* user_data) { - int* count = user_data; - ++*count; + count_messages_t* count = user_data; + JSContext* context = tf_ssb_get_context(count->ssb); + JS_ToInt32(context, &count->count, JS_GetPropertyStr(context, row, "count")); } static int _ssb_test_count_messages(tf_ssb_t* ssb) { - int count = 0; - tf_ssb_db_visit_query(ssb, "SELECT * FROM messages", JS_UNDEFINED, _count_messages_callback, &count); - return count; + count_messages_t count = { .ssb = ssb }; + tf_ssb_db_visit_query(ssb, "SELECT COUNT(*) AS count FROM messages", JS_UNDEFINED, _count_messages_callback, &count); + return count.count; } static void _message_added(tf_ssb_t* ssb, const char* id, void* user_data) @@ -564,15 +571,55 @@ void tf_ssb_test_bench(const tf_test_options_t* options) struct timespec start_time = { 0 }; struct timespec end_time = { 0 }; clock_gettime(CLOCK_REALTIME, &start_time); - for (int i = 0; i < 10 * 1024; i++) + const int k_messages = 4096; + for (int i = 0; i < k_messages; i++) { tf_ssb_append_post(ssb0, "Hello, world!"); } clock_gettime(CLOCK_REALTIME, &end_time); - printf("delta = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f); + printf("insert = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f); + + sqlite3* db1 = NULL; + sqlite3_open(":memory:", &db1); + assert(r == SQLITE_OK); + tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1); + tf_ssb_generate_keys(ssb1); + uint8_t id0bin[k_id_bin_len]; + tf_ssb_id_str_to_bin(id0bin, id0); + + uv_idle_t idle0 = { .data = ssb0 }; + uv_idle_init(&loop, &idle0); + uv_idle_start(&idle0, _ssb_test_idle); + + uv_idle_t idle1 = { .data = ssb1 }; + uv_idle_init(&loop, &idle1); + uv_idle_start(&idle1, _ssb_test_idle); + + tf_ssb_register(tf_ssb_get_context(ssb0), ssb0); + tf_ssb_register(tf_ssb_get_context(ssb1), ssb1); + + tf_ssb_server_open(ssb0, 12347); + tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin); + + printf("Waiting for messages.\n"); + clock_gettime(CLOCK_REALTIME, &start_time); + while (_ssb_test_count_messages(ssb1) < k_messages) + { + uv_run(&loop, UV_RUN_ONCE); + } + clock_gettime(CLOCK_REALTIME, &end_time); + printf("Done.\n"); + printf("replicate = %f seconds\n", (float)(end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9f); + + tf_ssb_send_close(ssb1); + tf_ssb_server_close(ssb0); + + uv_close((uv_handle_t*)&idle0, NULL); + uv_close((uv_handle_t*)&idle1, NULL); uv_run(&loop, UV_RUN_DEFAULT); + tf_ssb_destroy(ssb1); tf_ssb_destroy(ssb0); uv_loop_close(&loop);