diff --git a/src/ssb.c b/src/ssb.c index 5b9d4afc..33e06d56 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -204,6 +204,12 @@ typedef struct _tf_ssb_connection_message_request_t bool keys; } tf_ssb_connection_message_request_t; +typedef struct _tf_ssb_connection_scheduled_t +{ + tf_ssb_scheduled_callback_t* callback; + void* user_data; +} tf_ssb_connection_scheduled_t; + typedef struct _tf_ssb_connection_t { tf_ssb_t* ssb; @@ -266,6 +272,9 @@ typedef struct _tf_ssb_connection_t tf_ssb_connection_message_request_t* message_requests; int message_requests_count; + + tf_ssb_connection_scheduled_t* scheduled; + int scheduled_count; } tf_ssb_connection_t; static JSClassID _connection_class_id; @@ -477,6 +486,17 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection, } } +void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data) +{ + connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1)); + connection->scheduled[connection->scheduled_count++] = (tf_ssb_connection_scheduled_t) + { + .callback = callback, + .user_data = user_data, + }; + uv_async_send(&connection->async); +} + static int _request_compare(const void* a, const void* b) { int32_t ai = *(const int32_t*)a; @@ -1062,6 +1082,13 @@ bool tf_ssb_connection_is_client(tf_ssb_connection_t* connection) return connection->state == k_tf_ssb_state_verified; } +bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection) +{ + return + connection->state == k_tf_ssb_state_verified || + connection->state == k_tf_ssb_state_server_verified; +} + const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) { return connection->host; @@ -1607,6 +1634,17 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message) tf_ssb_append_message_with_keys(ssb, author, ssb->priv, message); } +static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) +{ + if (connection->scheduled_count) + { + tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0]; + memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1)); + connection->scheduled_count--; + scheduled.callback(connection, scheduled.user_data); + } +} + void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) { tf_ssb_t* ssb = connection->ssb; @@ -1614,6 +1652,12 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea { connection->destroy_reason = reason; } + while (connection->scheduled_count) + { + _tf_ssb_connection_dispatch_scheduled(connection); + } + tf_free(connection->scheduled); + connection->scheduled = NULL; while (connection->requests) { _tf_ssb_connection_remove_request(connection, connection->requests->request_number); @@ -2185,6 +2229,7 @@ static void _tf_ssb_connection_process_message_async(uv_async_t* async) { uv_async_send(&connection->async); } + _tf_ssb_connection_dispatch_scheduled(connection); } tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) diff --git a/src/ssb.h b/src/ssb.h index 68754cf8..0af41508 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -115,6 +115,7 @@ tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection); JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection); sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection); void tf_ssb_connection_close(tf_ssb_connection_t* connect); +bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); @@ -152,6 +153,9 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); +typedef void (tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data); +void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data); + void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys); void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 31b09a5f..bccbf609 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -15,6 +15,8 @@ #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif +static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys); + static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { char buffer[256]; @@ -675,20 +677,42 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c } } +typedef struct _tf_ssb_connection_send_history_stream_t +{ + int32_t request_number; + char author[k_id_base64_len]; + int64_t sequence; + bool keys; +} tf_ssb_connection_send_history_stream_t; + +static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) +{ + tf_ssb_connection_send_history_stream_t* request = user_data; + if (tf_ssb_connection_is_connected(connection)) + { + _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->sequence, request->keys); + } + tf_free(request); +} + static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); sqlite3* db = tf_ssb_get_db(ssb); sqlite3_stmt* statement; - if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) + const int k_max = 32; + int64_t max_sequence_seen = 0; + if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && - sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK) + sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK && + sqlite3_bind_int64(statement, 3, sequence + k_max) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { JSValue message = JS_UNDEFINED; + max_sequence_seen = sqlite3_column_int64(statement, 3); JSValue formatted = tf_ssb_format_message( context, @@ -717,6 +741,19 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti } sqlite3_finalize(statement); } + + if (max_sequence_seen == sequence + k_max - 1) + { + tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); + *async = (tf_ssb_connection_send_history_stream_t) + { + .request_number = request_number, + .sequence = max_sequence_seen + 1, + .keys = keys, + }; + snprintf(async->author, sizeof(async->author), "%s", author); + tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); + } } static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)