#include "ssb.connections.h" #include "log.h" #include "mem.h" #include "ssb.h" #include "uv.h" #include "sqlite3.h" #include #if !defined(_countof) #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif typedef struct _tf_ssb_connections_t { tf_ssb_t* ssb; uv_timer_t timer; } tf_ssb_connections_t; static void _tf_ssb_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { tf_ssb_connections_t* connections = user_data; switch (change) { case k_tf_ssb_change_create: { char key[k_id_base64_len]; if (tf_ssb_connection_get_host(connection) && *tf_ssb_connection_get_host(connection) && tf_ssb_connection_get_port(connection) && tf_ssb_connection_get_id(connection, key, sizeof(key))) { tf_ssb_connections_set_attempted(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key); } } break; case k_tf_ssb_change_connect: { char key[k_id_base64_len]; if (tf_ssb_connection_get_id(connection, key, sizeof(key))) { tf_ssb_connections_set_succeeded(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key); } } break; case k_tf_ssb_change_remove: case k_tf_ssb_change_update: break; } } static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connections, char* host, size_t host_size, int* port, char* key, size_t key_size) { bool result = false; sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_reader(connections->ssb); if (sqlite3_prepare(db, "SELECT host, port, key FROM connections WHERE last_attempt IS NULL OR (strftime('%s', 'now') - last_attempt > ?1) ORDER BY last_attempt LIMIT 1", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int(statement, 1, 60000) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { snprintf(host, host_size, "%s", sqlite3_column_text(statement, 0)); *port = sqlite3_column_int(statement, 1); snprintf(key, key_size, "%s", sqlite3_column_text(statement, 2)); result = true; } sqlite3_finalize(statement); } else { tf_printf("prepare: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(connections->ssb, db); return result; } typedef struct _tf_ssb_connections_get_next_t { uv_work_t work; tf_ssb_connections_t* connections; tf_ssb_t* ssb; bool ready; char host[256]; int port; char key[k_id_base64_len]; } tf_ssb_connections_get_next_t; static void _tf_ssb_connections_get_next_work(uv_work_t* work) { tf_ssb_connections_get_next_t* next = work->data; tf_ssb_record_thread_busy(next->ssb, true); next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key)); tf_ssb_record_thread_busy(next->ssb, false); } static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status) { tf_ssb_connections_get_next_t* next = work->data; if (next->ready) { uint8_t key_bin[k_id_bin_len]; if (tf_ssb_id_str_to_bin(key_bin, next->key)) { tf_ssb_connect(next->ssb, next->host, next->port, key_bin); } } tf_ssb_unref(next->ssb); tf_free(next); } static void _tf_ssb_connections_timer(uv_timer_t* timer) { tf_ssb_connections_t* connections = timer->data; tf_ssb_connection_t* active[4]; int count = tf_ssb_get_connections(connections->ssb, active, _countof(active)); if (count < (int)_countof(active)) { tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t)); *next = (tf_ssb_connections_get_next_t) { .work = { .data = next, }, .ssb = connections->ssb, .connections = connections, }; tf_ssb_ref(connections->ssb); int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work); if (result) { _tf_ssb_connections_get_next_after_work(&next->work, result); } } } tf_ssb_connections_t* tf_ssb_connections_create(tf_ssb_t* ssb) { tf_ssb_connections_t* connections = tf_malloc(sizeof(tf_ssb_connections_t)); memset(connections, 0, sizeof(*connections)); connections->ssb = ssb; tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, NULL, connections); uv_loop_t* loop = tf_ssb_get_loop(ssb); connections->timer.data = connections; uv_timer_init(loop, &connections->timer); uv_timer_start(&connections->timer, _tf_ssb_connections_timer, 2000, 2000); uv_unref((uv_handle_t*)&connections->timer); return connections; } static void _tf_ssb_connections_on_handle_close(uv_handle_t* handle) { tf_ssb_connections_t* connections = handle->data; handle->data = NULL; tf_free(connections); } void tf_ssb_connections_destroy(tf_ssb_connections_t* connections) { uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close); } typedef struct _tf_ssb_connections_update_t { uv_work_t work; tf_ssb_t* ssb; char host[256]; int port; char key[k_id_base64_len]; bool attempted; bool succeeded; } tf_ssb_connections_update_t; static void _tf_ssb_connections_update_work(uv_work_t* work) { tf_ssb_connections_update_t* update = work->data; tf_ssb_record_thread_busy(update->ssb, true); sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_writer(update->ssb); if (update->attempted) { if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) != SQLITE_DONE) { tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } } else if (update->succeeded) { if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) != SQLITE_DONE) { tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } } else { if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES (?1, ?2, ?3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK && sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK) { int r = sqlite3_step(statement); if (r != SQLITE_DONE) { tf_printf("tf_ssb_connections_store: %d, %s.\n", r, sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } } tf_ssb_release_db_writer(update->ssb, db); tf_ssb_record_thread_busy(update->ssb, false); } static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status) { tf_ssb_connections_update_t* update = work->data; tf_free(update); } static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update) { update->work.data = update; int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work); if (result) { _tf_ssb_connections_update_after_work(&update->work, result); } } void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { .ssb = connections->ssb, .port = port, }; snprintf(update->host, sizeof(update->host), "%s", host); snprintf(update->key, sizeof(update->key), "%s", key); _tf_ssb_connections_queue_update(connections, update); } void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { .ssb = connections->ssb, .port = port, .attempted = true, }; snprintf(update->host, sizeof(update->host), "%s", host); snprintf(update->key, sizeof(update->key), "%s", key); _tf_ssb_connections_queue_update(connections, update); } void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { .ssb = connections->ssb, .port = port, .succeeded = true, }; snprintf(update->host, sizeof(update->host), "%s", host); snprintf(update->key, sizeof(update->key), "%s", key); _tf_ssb_connections_queue_update(connections, update); }