tildefriends/src/ssb.connections.c

291 lines
8.9 KiB
C

#include "ssb.connections.h"
#include "log.h"
#include "mem.h"
#include "ssb.h"
#include "uv.h"
#include "sqlite3.h"
#include <string.h>
#if !defined(_countof)
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
#endif
typedef struct _tf_ssb_connections_t
{
tf_ssb_t* ssb;
uv_timer_t timer;
} tf_ssb_connections_t;
static void _tf_ssb_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
tf_ssb_connections_t* connections = user_data;
switch (change)
{
case k_tf_ssb_change_create:
{
char key[k_id_base64_len];
if (tf_ssb_connection_get_host(connection) &&
*tf_ssb_connection_get_host(connection) &&
tf_ssb_connection_get_port(connection) &&
tf_ssb_connection_get_id(connection, key, sizeof(key)))
{
tf_ssb_connections_set_attempted(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key);
}
}
break;
case k_tf_ssb_change_connect:
{
char key[k_id_base64_len];
if (tf_ssb_connection_get_id(connection, key, sizeof(key)))
{
tf_ssb_connections_set_succeeded(connections, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), key);
}
}
break;
case k_tf_ssb_change_remove:
break;
}
}
static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connections, char* host, size_t host_size, int* port, char* key, size_t key_size)
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(connections->ssb);
if (sqlite3_prepare(db, "SELECT host, port, key FROM connections WHERE last_attempt IS NULL OR (strftime('%s', 'now') - last_attempt > $1) ORDER BY last_attempt LIMIT 1", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_int(statement, 1, 60000) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
snprintf(host, host_size, "%s", sqlite3_column_text(statement, 0));
*port = sqlite3_column_int(statement, 1);
snprintf(key, key_size, "%s", sqlite3_column_text(statement, 2));
result = true;
}
sqlite3_finalize(statement);
}
else
{
tf_printf("prepare: %s\n", sqlite3_errmsg(db));
}
tf_ssb_release_db_reader(connections->ssb, db);
return result;
}
typedef struct _tf_ssb_connections_get_next_t
{
uv_work_t work;
tf_ssb_connections_t* connections;
tf_ssb_t* ssb;
bool ready;
char host[256];
int port;
char key[k_id_base64_len];
} tf_ssb_connections_get_next_t;
static void _tf_ssb_connections_get_next_work(uv_work_t* work)
{
tf_ssb_connections_get_next_t* next = work->data;
tf_ssb_record_thread_busy(next->ssb, true);
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
tf_ssb_record_thread_busy(next->ssb, false);
}
static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status)
{
tf_ssb_connections_get_next_t* next = work->data;
if (next->ready)
{
uint8_t key_bin[k_id_bin_len];
if (tf_ssb_id_str_to_bin(key_bin, next->key))
{
tf_ssb_connect(next->ssb, next->host, next->port, key_bin);
}
}
tf_free(next);
}
static void _tf_ssb_connections_timer(uv_timer_t* timer)
{
tf_ssb_connections_t* connections = timer->data;
tf_ssb_connection_t* active[4];
int count = tf_ssb_get_connections(connections->ssb, active, _countof(active));
if (count < (int)_countof(active))
{
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
*next = (tf_ssb_connections_get_next_t)
{
.work =
{
.data = next,
},
.ssb = connections->ssb,
.connections = connections,
};
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work);
if (result)
{
_tf_ssb_connections_get_next_after_work(&next->work, result);
}
}
}
tf_ssb_connections_t* tf_ssb_connections_create(tf_ssb_t* ssb)
{
tf_ssb_connections_t* connections = tf_malloc(sizeof(tf_ssb_connections_t));
memset(connections, 0, sizeof(*connections));
connections->ssb = ssb;
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_connections_changed_callback, NULL, connections);
uv_loop_t* loop = tf_ssb_get_loop(ssb);
connections->timer.data = connections;
uv_timer_init(loop, &connections->timer);
uv_timer_start(&connections->timer, _tf_ssb_connections_timer, 2000, 2000);
uv_unref((uv_handle_t*)&connections->timer);
return connections;
}
static void _tf_ssb_connections_on_handle_close(uv_handle_t* handle)
{
tf_ssb_connections_t* connections = handle->data;
handle->data = NULL;
tf_free(connections);
}
void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
{
uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close);
}
typedef struct _tf_ssb_connections_update_t
{
uv_work_t work;
tf_ssb_t* ssb;
char host[256];
int port;
char key[k_id_base64_len];
bool attempted;
bool succeeded;
} tf_ssb_connections_update_t;
static void _tf_ssb_connections_update_work(uv_work_t* work)
{
tf_ssb_connections_update_t* update = work->data;
tf_ssb_record_thread_busy(update->ssb, true);
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_writer(update->ssb);
if (update->attempted)
{
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
}
else if (update->succeeded)
{
if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
}
else
{
if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
{
int r = sqlite3_step(statement);
if (r != SQLITE_DONE)
{
tf_printf("tf_ssb_connections_store: %d, %s.\n", r, sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
}
tf_ssb_release_db_writer(update->ssb, db);
tf_ssb_record_thread_busy(update->ssb, false);
}
static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status)
{
tf_ssb_connections_update_t* update = work->data;
tf_free(update);
}
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
{
update->work.data = update;
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work);
if (result)
{
_tf_ssb_connections_update_after_work(&update->work, result);
}
}
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t)
{
.ssb = connections->ssb,
.port = port,
};
snprintf(update->host, sizeof(update->host), "%s", host);
snprintf(update->key, sizeof(update->key), "%s", key);
_tf_ssb_connections_queue_update(connections, update);
}
void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t)
{
.ssb = connections->ssb,
.port = port,
.attempted = true,
};
snprintf(update->host, sizeof(update->host), "%s", host);
snprintf(update->key, sizeof(update->key), "%s", key);
_tf_ssb_connections_queue_update(connections, update);
}
void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t)
{
.ssb = connections->ssb,
.port = port,
.succeeded = true,
};
snprintf(update->host, sizeof(update->host), "%s", host);
snprintf(update->key, sizeof(update->key), "%s", key);
_tf_ssb_connections_queue_update(connections, update);
}