diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 52b1b4820..78125a143 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -18,6 +18,7 @@ static void _tf_ssb_connection_send_history_stream( tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request); static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); +static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms); static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) { @@ -43,6 +44,30 @@ static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_ return result; } +static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) +{ + bool result = default_value; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) + { + if (sqlite3_step(statement) == SQLITE_ROW) + { + result = sqlite3_column_int(statement, 0) != 0; + } + } + sqlite3_finalize(statement); + } + else + { + tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); + } + tf_ssb_release_db_reader(ssb, db); + return result; +} + static void _tf_ssb_rpc_gossip_ping_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { @@ -1341,8 +1366,15 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) tf_ssb_release_db_writer(ssb, db); } +typedef struct _delete_t +{ + int deleted; + int64_t duration_ms; +} delete_t; + static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) { + delete_t* delete = user_data; int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); if (age <= 0) { @@ -1384,30 +1416,122 @@ static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_writer(ssb, db); - int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL; - tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); + delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; + tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)delete->duration_ms); _tf_ssb_rpc_checkpoint(ssb); - _tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); } static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data) { + delete_t* delete = user_data; + _tf_ssb_rpc_start_delete_blobs(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); + tf_free(delete); } -static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data) +static void _tf_ssb_rpc_start_delete_blobs_callback(tf_ssb_t* ssb, void* user_data) { - tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL); + delete_t* delete = tf_malloc(sizeof(delete_t)); + *delete = (delete_t) { 0 }; + tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, delete); } static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) { tf_printf("will delete more blobs in %d ms\n", delay_ms); - tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_callback, NULL); + tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_blobs_callback, NULL); +} + +static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data) +{ + delete_t* delete = user_data; + if (!_get_global_setting_bool(ssb, "delete_stale_feeds", false)) + { + return; + } + int64_t start_ns = uv_hrtime(); + int replication_hops = (int)_get_global_setting_int64(ssb, "replication_hops", 2); + const char** identities = tf_ssb_db_get_all_visible_identities(ssb, replication_hops); + + JSMallocFunctions funcs = { 0 }; + tf_get_js_malloc_functions(&funcs); + JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); + JSContext* context = JS_NewContext(runtime); + JSValue array = JS_NewArray(context); + for (int i = 0; identities[i]; i++) + { + JS_SetPropertyUint32(context, array, i, JS_NewString(context, identities[i])); + } + tf_free(identities); + + JSValue json = JS_JSONStringify(context, array, JS_NULL, JS_NULL); + const char* arg = JS_ToCString(context, json); + JS_FreeValue(context, json); + JS_FreeValue(context, array); + + sqlite3* db = tf_ssb_acquire_db_writer(ssb); + sqlite3_stmt* statement; + if (sqlite3_prepare(db, + "DELETE FROM messages WHERE author IN (" + " SELECT author FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) GROUP BY author LIMIT 1" + ") RETURNING author", + -1, &statement, NULL) == SQLITE_OK) + { + int status = SQLITE_OK; + bool printed = false; + if (sqlite3_bind_text(statement, 1, arg, -1, NULL) == SQLITE_OK) + { + while ((status = sqlite3_step(statement)) == SQLITE_ROW) + { + if (!printed) + { + tf_printf("deleting %s\n", sqlite3_column_text(statement, 0)); + printed = true; + delete->deleted++; + } + } + if (status != SQLITE_DONE) + { + tf_printf("deleting feeds: %s\n", sqlite3_errmsg(db)); + } + } + sqlite3_finalize(statement); + } + tf_ssb_release_db_writer(ssb, db); + + JS_FreeCString(context, arg); + + JS_FreeContext(context); + JS_FreeRuntime(runtime); + + delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; + tf_printf("Deleted %d feeds in %d ms.\n", delete->deleted, (int)delete->duration_ms); + _tf_ssb_rpc_checkpoint(ssb); +} + +static void _tf_ssb_rpc_delete_feeds_after_work(tf_ssb_t* ssb, int status, void* user_data) +{ + delete_t* delete = user_data; + _tf_ssb_rpc_start_delete_feeds(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); + tf_free(delete); +} + +static void _tf_ssb_rpc_start_delete_feeds_callback(tf_ssb_t* ssb, void* user_data) +{ + delete_t* delete = tf_malloc(sizeof(delete_t)); + *delete = (delete_t) { 0 }; + tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_feeds_work, _tf_ssb_rpc_delete_feeds_after_work, delete); +} + +static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms) +{ + tf_printf("will delete more feeds in %d ms\n", delay_ms); + tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_feeds_callback, NULL); } void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb) { _tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000); + _tf_ssb_rpc_start_delete_feeds(ssb, 25 * 1000); } typedef struct _peers_exchange_t diff --git a/src/util.js.c b/src/util.js.c index 4d082f906..699e0f532 100644 --- a/src/util.js.c +++ b/src/util.js.c @@ -350,7 +350,14 @@ static JSValue _util_defaultGlobalSettings(JSContext* context, JSValueConst this { .name = "room_name", .type = "string", .description = "Name of the room.", .default_value = JS_NewString(context, "tilde friends tunnel") }, { .name = "seeds_host", .type = "string", .description = "Hostname for seed connections.", .default_value = JS_NewString(context, "seeds.tildefriends.net") }, { .name = "account_registration", .type = "boolean", .description = "Allow registration of new accounts.", .default_value = JS_TRUE }, - { .name = "replication_hops", .type = "integer", .description = "Number of hops to replicate (1 = direct follows, 2 = follows of follows, etc.).", .default_value = JS_NewInt32(context, 2) }, + { .name = "replication_hops", + .type = "integer", + .description = "Number of hops to replicate (1 = direct follows, 2 = follows of follows, etc.).", + .default_value = JS_NewInt32(context, 2) }, + { .name = "delete_stale_feeds", + .type = "boolean", + .description = "Periodically delete feeds that visible from local accounts and related follows.", + .default_value = JS_FALSE }, }; JSValue settings = JS_NewObject(context);