From afde69b5d96f2b6a5b07230c27749917d08c9464 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Thu, 27 Jul 2023 02:51:42 +0000 Subject: [PATCH] Took a whack at cleaning up old blobs. git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4369 ed5197a5-7fde-0310-b194-c3ffbd925b24 --- core/core.js | 10 ++ src/android/AndroidManifest.xml | 4 +- src/ssb.db.c | 5 +- src/ssb.rpc.c | 236 +++++++++++++++++++++++--------- 4 files changed, 188 insertions(+), 67 deletions(-) diff --git a/core/core.js b/core/core.js index 2ddfaeb8..913cd074 100644 --- a/core/core.js +++ b/core/core.js @@ -72,6 +72,16 @@ const k_global_settings = { default_value: undefined, description: 'Comma-separated list of host names to which HTTP fetch requests are allowed. None if empty.', }, + blob_fetch_age_seconds: { + type: 'integer', + default_value: undefined, + description: 'Only blobs mentioned more recently than this age will be automatically fetched.', + }, + blob_expire_age_seconds: { + type: 'integer', + default_value: undefined, + description: 'Blobs older than this will be automatically deleted.', + }, }; let gGlobalSettings = { diff --git a/src/android/AndroidManifest.xml b/src/android/AndroidManifest.xml index 46361df4..fddf0344 100644 --- a/src/android/AndroidManifest.xml +++ b/src/android/AndroidManifest.xml @@ -1,8 +1,8 @@ + versionCode="10" + versionName="0.0.10-wip"> diff --git a/src/ssb.db.c b/src/ssb.db.c index 74698456..207db0d3 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -184,9 +184,10 @@ void tf_ssb_db_init(tf_ssb_t* ssb) _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_refs_ref_idx ON messages_refs (ref)"); _tf_ssb_db_exec(db, "DROP VIEW IF EXISTS blob_wants_view"); _tf_ssb_db_exec(db, - "CREATE VIEW IF NOT EXISTS blob_wants_view (id) AS " - " SELECT messages_refs.ref AS id " + "CREATE VIEW IF NOT EXISTS blob_wants_view (id, timestamp) AS " + " SELECT messages_refs.ref AS id, messages.timestamp AS timestamp " " FROM messages_refs " + " JOIN messages ON messages.id = messages_refs.message " " LEFT OUTER JOIN blobs ON messages_refs.ref = blobs.id " " WHERE blobs.id IS NULL " " AND LENGTH(messages_refs.ref) = 52 " diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index e692588f..2de45502 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -10,6 +10,7 @@ #include "uv.h" #include +#include #include #include @@ -19,6 +20,80 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); +static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); + +static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) +{ + int64_t result = default_value; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) == SQLITE_ROW) + { + result = sqlite3_column_int64(statement, 0); + } + } + sqlite3_finalize(statement); + } + else + { + tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + tf_ssb_release_db_reader(ssb, db); + return result; +} + +static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) +{ + bool result = default_value; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) == SQLITE_ROW) + { + result = sqlite3_column_int(statement, 0) != 0; + } + } + sqlite3_finalize(statement); + } + else + { + tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + tf_ssb_release_db_reader(ssb, db); + return result; +} + +static bool _get_global_setting_string(tf_ssb_t* ssb, const char* name, char* out_value, size_t size) +{ + bool result = false; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) == SQLITE_ROW) + { + snprintf(out_value, size, "%s", sqlite3_column_text(statement, 0)); + result = true; + } + } + sqlite3_finalize(statement); + } + else + { + tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + tf_ssb_release_db_reader(ssb, db); + return result; +} static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { @@ -143,11 +218,25 @@ static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) JSContext* context = tf_ssb_connection_get_context(connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1); + int64_t timestamp = -1; + if (age == 0) + { + /* Don't fetch any blobs. */ + return; + } + else if (age > 0) + { + int64_t now = (int64_t)time(NULL) * 1000ULL; + timestamp = now - age * 1000ULL; + } + sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; - if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) + if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) { - if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK) + if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && + sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { @@ -195,67 +284,6 @@ void _tf_ssb_rpc_tunnel_cleanup(tf_ssb_t* ssb, void* user_data) tf_free(user_data); } -static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) -{ - bool result = default_value; - JSContext* context = tf_ssb_get_context(ssb); - sqlite3* db = tf_ssb_acquire_db_reader(ssb); - sqlite3_stmt* statement; - if (sqlite3_prepare(db, "SELECT value FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) - { - if (sqlite3_step(statement) == SQLITE_ROW) - { - JSValue value = JS_ParseJSON(context, (const char*)sqlite3_column_text(statement, 0), sqlite3_column_bytes(statement, 0), NULL); - JSValue property = JS_GetPropertyStr(context, value, name); - if (JS_IsBool(property)) - { - result = JS_ToBool(context, property); - } - JS_FreeValue(context, property); - JS_FreeValue(context, value); - } - sqlite3_finalize(statement); - } - else - { - tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); - } - tf_ssb_release_db_reader(ssb, db); - return result; -} - -static bool _get_global_setting_string(tf_ssb_t* ssb, const char* name, char* out_value, size_t size) -{ - bool result = false; - JSContext* context = tf_ssb_get_context(ssb); - sqlite3* db = tf_ssb_acquire_db_reader(ssb); - sqlite3_stmt* statement; - if (sqlite3_prepare(db, "SELECT value FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) - { - if (sqlite3_step(statement) == SQLITE_ROW) - { - JSValue value = JS_ParseJSON(context, (const char*)sqlite3_column_text(statement, 0), sqlite3_column_bytes(statement, 0), NULL); - JSValue property = JS_GetPropertyStr(context, value, name); - const char* value_string = JS_ToCString(context, property); - if (value_string) - { - snprintf(out_value, size, "%s", value_string); - result = true; - JS_FreeCString(context, value_string); - } - JS_FreeValue(context, property); - JS_FreeValue(context, value); - } - sqlite3_finalize(statement); - } - else - { - tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); - } - tf_ssb_release_db_reader(ssb, db); - return result; -} - static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -1116,6 +1144,87 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang } } +static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) +{ + tf_ssb_t* ssb = work->data; + int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); + if (age <= 0) + { + return; + } + sqlite3* db = tf_ssb_acquire_db_writer(ssb); + sqlite3_stmt* statement; + int64_t now = (int64_t)time(NULL) * 1000ULL; + int64_t timestamp = now - age * 1000ULL; + const int k_limit = 256; + bool deleted = false; + if (sqlite3_prepare(db, + "DELETE FROM blobs WHERE blobs.id IN (" + " SELECT blobs.id FROM blobs " + " JOIN messages_refs ON blobs.id = messages_refs.ref " + " JOIN messages ON messages.id = messages_refs.message " + " GROUP BY blobs.id HAVING MAX(messages.timestamp) < ? LIMIT ?)", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && + sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK) + { + int r = sqlite3_step(statement); + if (r != SQLITE_DONE) + { + tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db)); + } + else + { + tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db)); + } + deleted = sqlite3_changes(db) != 0; + } + } + else + { + tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + tf_ssb_release_db_writer(ssb, db); + + if (deleted) + { + _tf_ssb_rpc_start_delete_blobs(ssb, 60 * 1000); + } +} + +static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status) +{ + tf_free(work); +} + +static void _tf_ssb_rpc_timer_on_close(uv_handle_t* handle) +{ + tf_free(handle); +} + +static void _tf_ssb_rpc_start_delete_timer(uv_timer_t* timer) +{ + tf_ssb_t* ssb = timer->data; + uv_work_t* work = tf_malloc(sizeof(uv_work_t)); + *work = (uv_work_t) { .data = ssb }; + int r = uv_queue_work(tf_ssb_get_loop(ssb), work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); + if (r) + { + tf_printf("uv_queue_work: %s\n", uv_strerror(r)); + tf_free(work); + } + uv_close((uv_handle_t*)timer, _tf_ssb_rpc_timer_on_close); +} + +static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) +{ + tf_printf("will delete more blobs in %d ms\n", delay_ms); + uv_timer_t* timer = tf_malloc(sizeof(uv_timer_t)); + *timer = (uv_timer_t) { .data = ssb }; + uv_timer_init(tf_ssb_get_loop(ssb), timer); + uv_timer_start(timer, _tf_ssb_rpc_start_delete_timer, delay_ms, 0); +} + void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); @@ -1129,4 +1238,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb) tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */ + _tf_ssb_rpc_start_delete_blobs(ssb, 0); }