Moved connections DB access to worker threads. I think global settings access is the only remaining thing on the main thread.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4503 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
parent
2c1a5359c6
commit
13c8b05f9a
@ -75,6 +75,37 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct _tf_ssb_connections_get_next_t
|
||||||
|
{
|
||||||
|
uv_work_t work;
|
||||||
|
tf_ssb_connections_t* connections;
|
||||||
|
tf_ssb_t* ssb;
|
||||||
|
bool ready;
|
||||||
|
char host[256];
|
||||||
|
int port;
|
||||||
|
char key[k_id_base64_len];
|
||||||
|
} tf_ssb_connections_get_next_t;
|
||||||
|
|
||||||
|
static void _tf_ssb_connections_get_next_work(uv_work_t* work)
|
||||||
|
{
|
||||||
|
tf_ssb_connections_get_next_t* next = work->data;
|
||||||
|
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status)
|
||||||
|
{
|
||||||
|
tf_ssb_connections_get_next_t* next = work->data;
|
||||||
|
if (next->ready)
|
||||||
|
{
|
||||||
|
uint8_t key_bin[k_id_bin_len];
|
||||||
|
if (tf_ssb_id_str_to_bin(key_bin, next->key))
|
||||||
|
{
|
||||||
|
tf_ssb_connect(next->ssb, next->host, next->port, key_bin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tf_free(next);
|
||||||
|
}
|
||||||
|
|
||||||
static void _tf_ssb_connections_timer(uv_timer_t* timer)
|
static void _tf_ssb_connections_timer(uv_timer_t* timer)
|
||||||
{
|
{
|
||||||
tf_ssb_connections_t* connections = timer->data;
|
tf_ssb_connections_t* connections = timer->data;
|
||||||
@ -82,16 +113,20 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer)
|
|||||||
int count = tf_ssb_get_connections(connections->ssb, active, _countof(active));
|
int count = tf_ssb_get_connections(connections->ssb, active, _countof(active));
|
||||||
if (count < (int)_countof(active))
|
if (count < (int)_countof(active))
|
||||||
{
|
{
|
||||||
char host[256];
|
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
|
||||||
int port;
|
*next = (tf_ssb_connections_get_next_t)
|
||||||
char key[k_id_base64_len];
|
|
||||||
if (_tf_ssb_connections_get_next_connection(connections, host, sizeof(host), &port, key, sizeof(key)))
|
|
||||||
{
|
{
|
||||||
uint8_t key_bin[k_id_bin_len];
|
.work =
|
||||||
if (tf_ssb_id_str_to_bin(key_bin, key))
|
|
||||||
{
|
{
|
||||||
tf_ssb_connect(connections->ssb, host, port, key_bin);
|
.data = next,
|
||||||
}
|
},
|
||||||
|
.ssb = connections->ssb,
|
||||||
|
.connections = connections,
|
||||||
|
};
|
||||||
|
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work);
|
||||||
|
if (result)
|
||||||
|
{
|
||||||
|
_tf_ssb_connections_get_next_after_work(&next->work, result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -125,15 +160,62 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
|
|||||||
uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close);
|
uv_close((uv_handle_t*)&connections->timer, _tf_ssb_connections_on_handle_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
typedef struct _tf_ssb_connections_update_t
|
||||||
{
|
{
|
||||||
|
uv_work_t work;
|
||||||
|
tf_ssb_t* ssb;
|
||||||
|
char host[256];
|
||||||
|
int port;
|
||||||
|
char key[k_id_base64_len];
|
||||||
|
bool attempted;
|
||||||
|
bool succeeded;
|
||||||
|
} tf_ssb_connections_update_t;
|
||||||
|
|
||||||
|
static void _tf_ssb_connections_update_work(uv_work_t* work)
|
||||||
|
{
|
||||||
|
tf_ssb_connections_update_t* update = work->data;
|
||||||
|
tf_ssb_record_thread_busy(update->ssb, true);
|
||||||
sqlite3_stmt* statement;
|
sqlite3_stmt* statement;
|
||||||
sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb);
|
sqlite3* db = tf_ssb_acquire_db_writer(update->ssb);
|
||||||
|
if (update->attempted)
|
||||||
|
{
|
||||||
|
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
|
||||||
|
{
|
||||||
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
|
||||||
|
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
||||||
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
||||||
|
{
|
||||||
|
if (sqlite3_step(statement) != SQLITE_DONE)
|
||||||
|
{
|
||||||
|
tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(statement);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (update->succeeded)
|
||||||
|
{
|
||||||
|
if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
|
||||||
|
{
|
||||||
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
|
||||||
|
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
||||||
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
||||||
|
{
|
||||||
|
if (sqlite3_step(statement) != SQLITE_DONE)
|
||||||
|
{
|
||||||
|
tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(statement);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
|
if (sqlite3_prepare(db, "INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
|
||||||
{
|
{
|
||||||
if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK &&
|
if (sqlite3_bind_text(statement, 1, update->host, -1, NULL) == SQLITE_OK &&
|
||||||
sqlite3_bind_int(statement, 2, port) == SQLITE_OK &&
|
sqlite3_bind_int(statement, 2, update->port) == SQLITE_OK &&
|
||||||
sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK)
|
sqlite3_bind_text(statement, 3, update->key, -1, NULL) == SQLITE_OK)
|
||||||
{
|
{
|
||||||
int r = sqlite3_step(statement);
|
int r = sqlite3_step(statement);
|
||||||
if (r != SQLITE_DONE)
|
if (r != SQLITE_DONE)
|
||||||
@ -143,45 +225,64 @@ void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* hos
|
|||||||
}
|
}
|
||||||
sqlite3_finalize(statement);
|
sqlite3_finalize(statement);
|
||||||
}
|
}
|
||||||
tf_ssb_release_db_writer(connections->ssb, db);
|
}
|
||||||
|
tf_ssb_release_db_writer(update->ssb, db);
|
||||||
|
tf_ssb_record_thread_busy(update->ssb, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status)
|
||||||
|
{
|
||||||
|
tf_ssb_connections_update_t* update = work->data;
|
||||||
|
tf_free(update);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
|
||||||
|
{
|
||||||
|
update->work.data = update;
|
||||||
|
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work);
|
||||||
|
if (result)
|
||||||
|
{
|
||||||
|
_tf_ssb_connections_update_after_work(&update->work, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
||||||
|
{
|
||||||
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||||
|
*update = (tf_ssb_connections_update_t)
|
||||||
|
{
|
||||||
|
.ssb = connections->ssb,
|
||||||
|
.port = port,
|
||||||
|
};
|
||||||
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
||||||
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
||||||
|
_tf_ssb_connections_queue_update(connections, update);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
||||||
{
|
{
|
||||||
sqlite3_stmt* statement;
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||||
sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb);
|
*update = (tf_ssb_connections_update_t)
|
||||||
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
|
|
||||||
{
|
{
|
||||||
if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK &&
|
.ssb = connections->ssb,
|
||||||
sqlite3_bind_int(statement, 2, port) == SQLITE_OK &&
|
.port = port,
|
||||||
sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK)
|
.attempted = true,
|
||||||
{
|
};
|
||||||
if (sqlite3_step(statement) != SQLITE_DONE)
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
||||||
{
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
||||||
tf_printf("tf_ssb_connections_set_attempted: %s.\n", sqlite3_errmsg(db));
|
_tf_ssb_connections_queue_update(connections, update);
|
||||||
}
|
|
||||||
}
|
|
||||||
sqlite3_finalize(statement);
|
|
||||||
}
|
|
||||||
tf_ssb_release_db_writer(connections->ssb, db);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
|
||||||
{
|
{
|
||||||
sqlite3_stmt* statement;
|
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
|
||||||
sqlite3* db = tf_ssb_acquire_db_writer(connections->ssb);
|
*update = (tf_ssb_connections_update_t)
|
||||||
if (sqlite3_prepare(db, "UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3", -1, &statement, NULL) == SQLITE_OK)
|
|
||||||
{
|
{
|
||||||
if (sqlite3_bind_text(statement, 1, host, -1, NULL) == SQLITE_OK &&
|
.ssb = connections->ssb,
|
||||||
sqlite3_bind_int(statement, 2, port) == SQLITE_OK &&
|
.port = port,
|
||||||
sqlite3_bind_text(statement, 3, key, -1, NULL) == SQLITE_OK)
|
.succeeded = true,
|
||||||
{
|
};
|
||||||
if (sqlite3_step(statement) != SQLITE_DONE)
|
snprintf(update->host, sizeof(update->host), "%s", host);
|
||||||
{
|
snprintf(update->key, sizeof(update->key), "%s", key);
|
||||||
tf_printf("tf_ssb_connections_set_succeeded: %s.\n", sqlite3_errmsg(db));
|
_tf_ssb_connections_queue_update(connections, update);
|
||||||
}
|
|
||||||
}
|
|
||||||
sqlite3_finalize(statement);
|
|
||||||
}
|
|
||||||
tf_ssb_release_db_writer(connections->ssb, db);
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user