349 lines
11 KiB
C
349 lines
11 KiB
C
#include "ssb.connections.h"
|
|
|
|
#include "log.h"
|
|
#include "mem.h"
|
|
#include "ssb.h"
|
|
|
|
#include "sqlite3.h"
|
|
#include "uv.h"
|
|
|
|
#include <string.h>
|
|
|
|
#if !defined(_countof)
|
|
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
|
|
#endif
|
|
|
|
typedef struct _tf_ssb_connections_t
|
|
{
|
|
tf_ssb_t* ssb;
|
|
uv_timer_t timer;
|
|
} tf_ssb_connections_t;
|
|
|
|
static void _tf_ssb_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
|
|
{
|
|
if (!connection)
|
|
{
|
|
return;
|
|
}
|
|
|
|
tf_ssb_connections_t* connections = user_data;
|
|
switch (change)
|
|
{
|
|
case k_tf_ssb_change_create:
|
|
{
|
|
char key[k_id_base64_len];
|
|
if (tf_ssb_connection_get_host(connection) && *tf_ssb_connection_get_host(connection) && tf_ssb_connection_get_port(connection) &&
|
|
tf_ssb_connection_get_id(connection, key, sizeof(key)))
|
|
{
|
|
tf_ssb_connections_set_attempted(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key);
|
|
}
|
|
}
|
|
break;
|
|
case k_tf_ssb_change_connect:
|
|
{
|
|
char key[k_id_base64_len];
|
|
if (tf_ssb_connection_get_id(connection, key, sizeof(key)))
|
|
{
|
|
tf_ssb_connections_set_succeeded(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key);
|
|
}
|
|
}
|
|
break;
|
|
case k_tf_ssb_change_remove:
|
|
case k_tf_ssb_change_update:
|
|
break;
|
|
}
|
|
}
|
|
|
|
static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connections, char* host, size_t host_size, int* port, char* key, size_t key_size)
|
|
{
|
|
bool result = false;
|
|
sqlite3_stmt* statement;
|
|
sqlite3* db = tf_ssb_acquire_db_reader(connections->ssb);
|
|
if (sqlite3_prepare(db, "SELECT host, port, key FROM connections WHERE last_attempt IS NULL OR (strftime('%s', 'now') - last_attempt > ?1) ORDER BY last_attempt LIMIT 1", -1,
|
|
&statement, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_bind_int(statement, 1, 60000) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW)
|
|
{
|
|
snprintf(host, host_size, "%s", sqlite3_column_text(statement, 0));
|
|
*port = sqlite3_column_int(statement, 1);
|
|
snprintf(key, key_size, "%s", sqlite3_column_text(statement, 2));
|
|
result = true;
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
else
|
|
{
|
|
tf_printf("prepare: %s\n", sqlite3_errmsg(db));
|
|
}
|
|
tf_ssb_release_db_reader(connections->ssb, db);
|
|
return result;
|
|
}
|
|
|
|
typedef struct _tf_ssb_connections_get_next_t
|
|
{
|
|
tf_ssb_connections_t* connections;
|
|
bool ready;
|
|
char host[256];
|
|
int port;
|
|
char key[k_id_base64_len];
|
|
} tf_ssb_connections_get_next_t;
|
|
|
|
static void _tf_ssb_connections_get_next_work(tf_ssb_t* ssb, void* user_data)
|
|
{
|
|
tf_ssb_connections_get_next_t* next = user_data;
|
|
if (tf_ssb_is_shutting_down(ssb))
|
|
{
|
|
return;
|
|
}
|
|
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
|
|
}
|
|
|
|
static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
|
{
|
|
tf_ssb_connections_get_next_t* next = user_data;
|
|
if (next->ready)
|
|
{
|
|
uint8_t key_bin[k_id_bin_len];
|
|
if (tf_ssb_id_str_to_bin(key_bin, next->key))
|
|
{
|
|
tf_ssb_connect(ssb, next->host, next->port, key_bin, 0);
|
|
}
|
|
}
|
|
tf_free(next);
|
|
}
|
|
|
|
static void _tf_ssb_connections_timer(uv_timer_t* timer)
|
|
{
|
|
tf_ssb_connections_t* connections = timer->data;
|
|
tf_ssb_connection_t* active[4];
|
|
int count = tf_ssb_get_connections(connections->ssb, active, _countof(active));
|
|
if (count < (int)_countof(active))
|
|
{
|
|
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
|
|
*next = (tf_ssb_connections_get_next_t) {
|
|
.connections = connections,
|
|
};
|
|
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work, next);
|
|
}
|
|
}
|
|
|
|
tf_ssb_connections_t* tf_ssb_connections_create(tf_ssb_t* ssb)
|
|
{
|
|
tf_ssb_connections_t* connections = tf_malloc(sizeof(tf_ssb_connections_t));
|
|
memset(connections, 0, sizeof(*connections));
|
|
connections->ssb = ssb;
|
|
|
|
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, NULL, connections);
|
|
|
|
uv_loop_t* loop = tf_ssb_get_loop(ssb);
|
|
connections->timer.data = connections;
|
|
uv_timer_init(loop, &connections->timer);
|
|
uv_timer_start(&connections->timer, _tf_ssb_connections_timer, 2000, 2000);
|
|
uv_unref((uv_handle_t*)&connections->timer);
|
|
|
|
return connections;
|
|
}
|
|
|
|
static void _tf_ssb_connections_on_handle_close(uv_handle_t* handle)
|
|
{
|
|
tf_ssb_connections_t* connections = handle->data;
|
|
handle->data = NULL;
|
|
tf_free(connections);
|
|
}
|
|
|
|
void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
|
|
{
|
|
uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close);
|
|
}
|
|
|
|
typedef struct _tf_ssb_connections_update_t
|
|
{
|
|
char host[256];
|
|
int port;
|
|
char key[k_id_base64_len];
|
|
bool attempted;
|
|
bool succeeded;
|
|
} tf_ssb_connections_update_t;
|
|
|
|
static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data)
|
|
{
|
|
tf_ssb_connections_update_t* update = user_data;
|
|
if (tf_ssb_is_shutting_down(ssb))
|
|
{
|
|
return;
|
|
}
|
|
sqlite3_stmt* statement;
|
|
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
|
|
if (update->attempted)
|
|
{
|
|
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_step(statement) != SQLITE_DONE)
|
|
{
|
|
tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db));
|
|
}
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
}
|
|
else if (update->succeeded)
|
|
{
|
|
if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_step(statement) != SQLITE_DONE)
|
|
{
|
|
tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db));
|
|
}
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES (?1, ?2, ?3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
|
|
{
|
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
|
{
|
|
int r = sqlite3_step(statement);
|
|
if (r != SQLITE_DONE)
|
|
{
|
|
tf_printf("tf_ssb_connections_store: %d, %s.\n", r, sqlite3_errmsg(db));
|
|
}
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
}
|
|
tf_ssb_release_db_writer(ssb, db);
|
|
}
|
|
|
|
static void _tf_ssb_connections_update_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
|
{
|
|
tf_free(user_data);
|
|
}
|
|
|
|
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
|
|
{
|
|
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work, update);
|
|
}
|
|
|
|
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
|
{
|
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
|
*update = (tf_ssb_connections_update_t) {
|
|
.port = port,
|
|
};
|
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
|
_tf_ssb_connections_queue_update(connections, update);
|
|
}
|
|
|
|
void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
|
{
|
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
|
*update = (tf_ssb_connections_update_t) {
|
|
.port = port,
|
|
.attempted = true,
|
|
};
|
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
|
_tf_ssb_connections_queue_update(connections, update);
|
|
}
|
|
|
|
void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
|
{
|
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
|
*update = (tf_ssb_connections_update_t) {
|
|
.port = port,
|
|
.succeeded = true,
|
|
};
|
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
|
_tf_ssb_connections_queue_update(connections, update);
|
|
}
|
|
|
|
static void _tf_ssb_connections_sync_broadcast_visit(
|
|
const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
|
|
{
|
|
tf_ssb_t* ssb = user_data;
|
|
if (tunnel)
|
|
{
|
|
char target_id[k_id_base64_len] = { 0 };
|
|
if (tf_ssb_id_bin_to_str(target_id, sizeof(target_id), pub))
|
|
{
|
|
char portal_id[k_id_base64_len] = { 0 };
|
|
if (tf_ssb_connection_get_id(tunnel, portal_id, sizeof(portal_id)))
|
|
{
|
|
tf_ssb_tunnel_create(ssb, portal_id, target_id, k_tf_ssb_connect_flag_one_shot);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
tf_ssb_connect(ssb, host, ntohs(addr->sin_port), pub, k_tf_ssb_connect_flag_one_shot);
|
|
}
|
|
}
|
|
|
|
typedef struct _tf_ssb_connections_get_all_work_t
|
|
{
|
|
char** connections;
|
|
int connections_count;
|
|
} tf_ssb_connections_get_all_work_t;
|
|
|
|
static void _tf_ssb_connections_get_all_work(tf_ssb_t* ssb, void* user_data)
|
|
{
|
|
tf_ssb_connections_get_all_work_t* work = user_data;
|
|
sqlite3_stmt* statement;
|
|
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
|
if (sqlite3_prepare(db, "SELECT host, port, key FROM connections ORDER BY last_attempt", -1, &statement, NULL) == SQLITE_OK)
|
|
{
|
|
while (sqlite3_step(statement) == SQLITE_ROW)
|
|
{
|
|
const char* host = (const char*)sqlite3_column_text(statement, 0);
|
|
int port = sqlite3_column_int(statement, 1);
|
|
const char* key = (const char*)sqlite3_column_text(statement, 2);
|
|
char connection[1024] = { 0 };
|
|
snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, port, *key == '@' ? key + 1 : key);
|
|
char* dot = strrchr(connection, '.');
|
|
if (dot && strcmp(dot, ".ed25519") == 0)
|
|
{
|
|
*dot = '\0';
|
|
}
|
|
work->connections = tf_resize_vec(work->connections, sizeof(char*) * (work->connections_count + 1));
|
|
work->connections[work->connections_count++] = tf_strdup(connection);
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
else
|
|
{
|
|
tf_printf("prepare: %s\n", sqlite3_errmsg(db));
|
|
}
|
|
tf_ssb_release_db_reader(ssb, db);
|
|
}
|
|
|
|
static void _tf_ssb_connections_get_all_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
|
{
|
|
tf_ssb_connections_get_all_work_t* work = user_data;
|
|
for (int i = 0; i < work->connections_count; i++)
|
|
{
|
|
tf_ssb_connect_str(ssb, work->connections[i], k_tf_ssb_connect_flag_one_shot);
|
|
tf_free(work->connections[i]);
|
|
}
|
|
tf_free(work->connections);
|
|
tf_free(work);
|
|
}
|
|
|
|
void tf_ssb_connections_sync_start(tf_ssb_connections_t* connections)
|
|
{
|
|
tf_ssb_connections_get_all_work_t* work = tf_malloc(sizeof(tf_ssb_connections_get_all_work_t));
|
|
*work = (tf_ssb_connections_get_all_work_t) { 0 };
|
|
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_all_work, _tf_ssb_connections_get_all_after_work, work);
|
|
tf_ssb_visit_broadcasts(connections->ssb, _tf_ssb_connections_sync_broadcast_visit, connections->ssb);
|
|
}
|