Send history streams in batches. Should block the main thread less.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4127 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-01-17 02:17:29 +00:00
parent 73863f9418
commit 67d34bf70e
3 changed files with 88 additions and 2 deletions

View File

@ -204,6 +204,12 @@ typedef struct _tf_ssb_connection_message_request_t
bool keys; bool keys;
} tf_ssb_connection_message_request_t; } tf_ssb_connection_message_request_t;
typedef struct _tf_ssb_connection_scheduled_t
{
tf_ssb_scheduled_callback_t* callback;
void* user_data;
} tf_ssb_connection_scheduled_t;
typedef struct _tf_ssb_connection_t typedef struct _tf_ssb_connection_t
{ {
tf_ssb_t* ssb; tf_ssb_t* ssb;
@ -266,6 +272,9 @@ typedef struct _tf_ssb_connection_t
tf_ssb_connection_message_request_t* message_requests; tf_ssb_connection_message_request_t* message_requests;
int message_requests_count; int message_requests_count;
tf_ssb_connection_scheduled_t* scheduled;
int scheduled_count;
} tf_ssb_connection_t; } tf_ssb_connection_t;
static JSClassID _connection_class_id; static JSClassID _connection_class_id;
@ -477,6 +486,17 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection,
} }
} }
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data)
{
connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1));
connection->scheduled[connection->scheduled_count++] = (tf_ssb_connection_scheduled_t)
{
.callback = callback,
.user_data = user_data,
};
uv_async_send(&connection->async);
}
static int _request_compare(const void* a, const void* b) static int _request_compare(const void* a, const void* b)
{ {
int32_t ai = *(const int32_t*)a; int32_t ai = *(const int32_t*)a;
@ -1062,6 +1082,13 @@ bool tf_ssb_connection_is_client(tf_ssb_connection_t* connection)
return connection->state == k_tf_ssb_state_verified; return connection->state == k_tf_ssb_state_verified;
} }
bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection)
{
return
connection->state == k_tf_ssb_state_verified ||
connection->state == k_tf_ssb_state_server_verified;
}
const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection)
{ {
return connection->host; return connection->host;
@ -1607,6 +1634,17 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message)
tf_ssb_append_message_with_keys(ssb, author, ssb->priv, message); tf_ssb_append_message_with_keys(ssb, author, ssb->priv, message);
} }
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection)
{
if (connection->scheduled_count)
{
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0];
memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1));
connection->scheduled_count--;
scheduled.callback(connection, scheduled.user_data);
}
}
void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason)
{ {
tf_ssb_t* ssb = connection->ssb; tf_ssb_t* ssb = connection->ssb;
@ -1614,6 +1652,12 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea
{ {
connection->destroy_reason = reason; connection->destroy_reason = reason;
} }
while (connection->scheduled_count)
{
_tf_ssb_connection_dispatch_scheduled(connection);
}
tf_free(connection->scheduled);
connection->scheduled = NULL;
while (connection->requests) while (connection->requests)
{ {
_tf_ssb_connection_remove_request(connection, connection->requests->request_number); _tf_ssb_connection_remove_request(connection, connection->requests->request_number);
@ -2185,6 +2229,7 @@ static void _tf_ssb_connection_process_message_async(uv_async_t* async)
{ {
uv_async_send(&connection->async); uv_async_send(&connection->async);
} }
_tf_ssb_connection_dispatch_scheduled(connection);
} }
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)

View File

@ -115,6 +115,7 @@ tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection);
JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection); JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection);
sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection); sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection);
void tf_ssb_connection_close(tf_ssb_connection_t* connect); void tf_ssb_connection_close(tf_ssb_connection_t* connect);
bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection);
int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection);
@ -152,6 +153,9 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f
void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number); void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection);
typedef void (tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data);
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data);
void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys); void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys);
void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author); void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author);

View File

@ -15,6 +15,8 @@
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
#endif #endif
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys);
static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
char buffer[256]; char buffer[256];
@ -675,20 +677,42 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c
} }
} }
typedef struct _tf_ssb_connection_send_history_stream_t
{
int32_t request_number;
char author[k_id_base64_len];
int64_t sequence;
bool keys;
} tf_ssb_connection_send_history_stream_t;
static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data)
{
tf_ssb_connection_send_history_stream_t* request = user_data;
if (tf_ssb_connection_is_connected(connection))
{
_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->sequence, request->keys);
}
tf_free(request);
}
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys) static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys)
{ {
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb); JSContext* context = tf_ssb_get_context(ssb);
sqlite3* db = tf_ssb_get_db(ssb); sqlite3* db = tf_ssb_get_db(ssb);
sqlite3_stmt* statement; sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) const int k_max = 32;
int64_t max_sequence_seen = 0;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
{ {
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK) sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK &&
sqlite3_bind_int64(statement, 3, sequence + k_max) == SQLITE_OK)
{ {
while (sqlite3_step(statement) == SQLITE_ROW) while (sqlite3_step(statement) == SQLITE_ROW)
{ {
JSValue message = JS_UNDEFINED; JSValue message = JS_UNDEFINED;
max_sequence_seen = sqlite3_column_int64(statement, 3);
JSValue formatted = tf_ssb_format_message( JSValue formatted = tf_ssb_format_message(
context, context,
@ -717,6 +741,19 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);
} }
if (max_sequence_seen == sequence + k_max - 1)
{
tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
*async = (tf_ssb_connection_send_history_stream_t)
{
.request_number = request_number,
.sequence = max_sequence_seen + 1,
.keys = keys,
};
snprintf(async->author, sizeof(async->author), "%s", author);
tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async);
}
} }
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)