diff --git a/src/ssb.c b/src/ssb.c index 29c4a91d..4376e2eb 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -17,6 +17,7 @@ #include "sodium/crypto_scalarmult_curve25519.h" #include "sodium/crypto_secretbox.h" #include "sodium/crypto_sign.h" +#include "sodium/randombytes.h" #include "sodium/utils.h" #include "sqlite3.h" #include "uv.h" @@ -261,7 +262,7 @@ typedef struct _tf_ssb_connection_message_request_t typedef struct _tf_ssb_connection_scheduled_t { - char key[k_id_base64_len]; + char key[128]; tf_ssb_scheduled_callback_t* callback; void* user_data; } tf_ssb_connection_scheduled_t; @@ -323,6 +324,7 @@ typedef struct _tf_ssb_connection_t uint8_t secretbox_buf[k_tf_ssb_rpc_message_body_length_max]; uint32_t send_request_number; + uint64_t prng; tf_ssb_connection_t* next; tf_ssb_request_t* requests; @@ -592,17 +594,26 @@ static void _tf_ssb_connection_scheduled_async(uv_async_t* async) _tf_ssb_connection_dispatch_scheduled(connection); } +static uint32_t _tf_ssb_connection_prng(tf_ssb_connection_t* connection) +{ + uint64_t seed = connection->prng * 6364136223846793005ULL + 1442695040888963407ULL; + uint32_t result = (uint32_t)((seed ^ (seed >> 22)) >> (22 + (seed >> 61))); + connection->prng = seed; + return result; +} + static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) { while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) { - int index = uv_hrtime() % connection->scheduled_count; + int index = _tf_ssb_connection_prng(connection) % connection->scheduled_count; tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; if (index != connection->scheduled_count - 1) { memmove(connection->scheduled + index, connection->scheduled + index + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - index - 1)); } connection->scheduled_count--; + tf_trace_begin(connection->ssb->trace, "scheduled callback"); PRE_CALLBACK(connection->ssb, scheduled.callback); scheduled.callback(connection, false, scheduled.user_data); @@ -623,14 +634,11 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare); if (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0) { - tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; - connection->scheduled[index].callback = callback; - connection->scheduled[index].user_data = user_data; - + /* Keep the old request. Skip the new request. */ tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)"); - PRE_CALLBACK(connection->ssb, scheduled.callback); - scheduled.callback(connection, true, scheduled.user_data); - POST_CALLBACK(connection->ssb, scheduled.callback); + PRE_CALLBACK(connection->ssb, callback); + callback(connection, true, user_data); + POST_CALLBACK(connection->ssb, callback); tf_trace_end(connection->ssb->trace); } else @@ -2728,6 +2736,7 @@ static tf_ssb_connection_t* _tf_ssb_connection_create_internal(tf_ssb_t* ssb, co snprintf(connection->name, sizeof(connection->name), "%s%d", name, index); connection->ssb = ssb; connection->send_request_number = 1; + randombytes_buf(&connection->prng, sizeof(connection->prng)); connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); diff --git a/src/ssb.h b/src/ssb.h index 0c45dbfb..599640e6 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -807,7 +807,7 @@ typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, bool /** ** Schedule work to be run when the connection is next idle. ** @param connection The owning connection. -** @param key A key identifying the work. If work by the same key already exists, it will be replaced. +** @param key A key identifying the work. If work by the same key already exists, the new request will be discarded. ** @param callback The callback to call. ** @param user_data User data to pass to the callback. */ diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index f9e76e63..7bd1213b 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -908,7 +908,7 @@ static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* static void _tf_ssb_connection_send_history_stream( tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request) { - if (!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) + if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) { tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); *async = (tf_ssb_connection_send_history_stream_t) { @@ -919,7 +919,9 @@ static void _tf_ssb_connection_send_history_stream( .end_request = end_request, }; snprintf(async->author, sizeof(async->author), "%s", author); - tf_ssb_connection_schedule_idle(connection, author, _tf_ssb_connection_send_history_stream_callback, async); + char key[128]; + snprintf(key, sizeof(key), "%s:%" PRId64, author, sequence); + tf_ssb_connection_schedule_idle(connection, key, _tf_ssb_connection_send_history_stream_callback, async); } }