diff --git a/src/ssb.connections.c b/src/ssb.connections.c index e7c28ed9..9e2f6585 100644 --- a/src/ssb.connections.c +++ b/src/ssb.connections.c @@ -75,6 +75,37 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec return result; } +typedef struct _tf_ssb_connections_get_next_t +{ + uv_work_t work; + tf_ssb_connections_t* connections; + tf_ssb_t* ssb; + bool ready; + char host[256]; + int port; + char key[k_id_base64_len]; +} tf_ssb_connections_get_next_t; + +static void _tf_ssb_connections_get_next_work(uv_work_t* work) +{ + tf_ssb_connections_get_next_t* next = work->data; + next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key)); +} + +static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status) +{ + tf_ssb_connections_get_next_t* next = work->data; + if (next->ready) + { + uint8_t key_bin[k_id_bin_len]; + if (tf_ssb_id_str_to_bin(key_bin, next->key)) + { + tf_ssb_connect(next->ssb, next->host, next->port, key_bin); + } + } + tf_free(next); +} + static void _tf_ssb_connections_timer(uv_timer_t* timer) { tf_ssb_connections_t* connections = timer->data; @@ -82,16 +113,20 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer) int count = tf_ssb_get_connections(connections->ssb, active, _countof(active)); if (count < (int)_countof(active)) { - char host[256]; - int port; - char key[k_id_base64_len]; - if (_tf_ssb_connections_get_next_connection(connections, host, sizeof(host), &port, key, sizeof(key))) + tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t)); + *next = (tf_ssb_connections_get_next_t) { - uint8_t key_bin[k_id_bin_len]; - if (tf_ssb_id_str_to_bin(key_bin, key)) + .work = { - tf_ssb_connect(connections->ssb, host, port, key_bin); - } + .data = next, + }, + .ssb = connections->ssb, + .connections = connections, + }; + int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work); + if (result) + { + _tf_ssb_connections_get_next_after_work(&next->work, result); } } } @@ -125,63 +160,129 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections) uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close); } +typedef struct _tf_ssb_connections_update_t +{ + uv_work_t work; + tf_ssb_t* ssb; + char host[256]; + int port; + char key[k_id_base64_len]; + bool attempted; + bool succeeded; +} tf_ssb_connections_update_t; + +static void _tf_ssb_connections_update_work(uv_work_t* work) +{ + tf_ssb_connections_update_t* update = work->data; + tf_ssb_record_thread_busy(update->ssb, true); + sqlite3_stmt* statement; + sqlite3* db = tf_ssb_acquire_db_writer(update->ssb); + if (update->attempted) + { + if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && + sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && + sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) != SQLITE_DONE) + { + tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db)); + } + } + sqlite3_finalize(statement); + } + } + else if (update->succeeded) + { + if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && + sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && + sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) != SQLITE_DONE) + { + tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db)); + } + } + sqlite3_finalize(statement); + } + } + else + { + if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && + sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && + sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) + { + int r = sqlite3_step(statement); + if (r != SQLITE_DONE) + { + tf_printf("tf_ssb_connections_store: %d, %s.\n", r, sqlite3_errmsg(db)); + } + } + sqlite3_finalize(statement); + } + } + tf_ssb_release_db_writer(update->ssb, db); + tf_ssb_record_thread_busy(update->ssb, false); +} + +static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status) +{ + tf_ssb_connections_update_t* update = work->data; + tf_free(update); +} + +static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update) +{ + update->work.data = update; + int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work); + if (result) + { + _tf_ssb_connections_update_after_work(&update->work, result); + } +} + void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { - sqlite3_stmt* statement; - sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb); - if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) + tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); + *update = (tf_ssb_connections_update_t) { - if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK && - sqlite3_bind_int(statement, 2, port) == SQLITE_OK && - sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK) - { - int r = sqlite3_step(statement); - if (r != SQLITE_DONE) - { - tf_printf("tf_ssb_connections_store: %d, %s.\n", r, sqlite3_errmsg(db)); - } - } - sqlite3_finalize(statement); - } - tf_ssb_release_db_writer(connections->ssb, db); + .ssb = connections->ssb, + .port = port, + }; + snprintf(update->host, sizeof(update->host), "%s", host); + snprintf(update->key, sizeof(update->key), "%s", key); + _tf_ssb_connections_queue_update(connections, update); } void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { - sqlite3_stmt* statement; - sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb); - if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK) + tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); + *update = (tf_ssb_connections_update_t) { - if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK && - sqlite3_bind_int(statement, 2, port) == SQLITE_OK && - sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK) - { - if (sqlite3_step(statement) != SQLITE_DONE) - { - tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db)); - } - } - sqlite3_finalize(statement); - } - tf_ssb_release_db_writer(connections->ssb, db); + .ssb = connections->ssb, + .port = port, + .attempted = true, + }; + snprintf(update->host, sizeof(update->host), "%s", host); + snprintf(update->key, sizeof(update->key), "%s", key); + _tf_ssb_connections_queue_update(connections, update); } void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { - sqlite3_stmt* statement; - sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb); - if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK) + tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); + *update = (tf_ssb_connections_update_t) { - if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK && - sqlite3_bind_int(statement, 2, port) == SQLITE_OK && - sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK) - { - if (sqlite3_step(statement) != SQLITE_DONE) - { - tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db)); - } - } - sqlite3_finalize(statement); - } - tf_ssb_release_db_writer(connections->ssb, db); + .ssb = connections->ssb, + .port = port, + .succeeded = true, + }; + snprintf(update->host, sizeof(update->host), "%s", host); + snprintf(update->key, sizeof(update->key), "%s", key); + _tf_ssb_connections_queue_update(connections, update); }