forked from cory/tildefriends
		
	ssb: Add a setting to periodically clean up un-followed feeds. #80
This commit is contained in:
		
							
								
								
									
										136
									
								
								src/ssb.rpc.c
									
									
									
									
									
								
							
							
						
						
									
										136
									
								
								src/ssb.rpc.c
									
									
									
									
									
								
							| @@ -18,6 +18,7 @@ static void _tf_ssb_connection_send_history_stream( | ||||
| 	tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request); | ||||
| static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection); | ||||
| static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); | ||||
| static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms); | ||||
|  | ||||
| static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) | ||||
| { | ||||
| @@ -43,6 +44,30 @@ static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_ | ||||
| 	return result; | ||||
| } | ||||
|  | ||||
| static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) | ||||
| { | ||||
| 	bool result = default_value; | ||||
| 	sqlite3* db = tf_ssb_acquire_db_reader(ssb); | ||||
| 	sqlite3_stmt* statement; | ||||
| 	if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) | ||||
| 	{ | ||||
| 		if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) | ||||
| 		{ | ||||
| 			if (sqlite3_step(statement) == SQLITE_ROW) | ||||
| 			{ | ||||
| 				result = sqlite3_column_int(statement, 0) != 0; | ||||
| 			} | ||||
| 		} | ||||
| 		sqlite3_finalize(statement); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); | ||||
| 	} | ||||
| 	tf_ssb_release_db_reader(ssb, db); | ||||
| 	return result; | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_gossip_ping_callback( | ||||
| 	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) | ||||
| { | ||||
| @@ -1341,8 +1366,15 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) | ||||
| 	tf_ssb_release_db_writer(ssb, db); | ||||
| } | ||||
|  | ||||
| typedef struct _delete_t | ||||
| { | ||||
| 	int deleted; | ||||
| 	int64_t duration_ms; | ||||
| } delete_t; | ||||
|  | ||||
| static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) | ||||
| { | ||||
| 	delete_t* delete = user_data; | ||||
| 	int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); | ||||
| 	if (age <= 0) | ||||
| 	{ | ||||
| @@ -1384,30 +1416,122 @@ static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) | ||||
| 		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); | ||||
| 	} | ||||
| 	tf_ssb_release_db_writer(ssb, db); | ||||
| 	int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL; | ||||
| 	tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); | ||||
| 	delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; | ||||
| 	tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)delete->duration_ms); | ||||
| 	_tf_ssb_rpc_checkpoint(ssb); | ||||
| 	_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data) | ||||
| { | ||||
| 	delete_t* delete = user_data; | ||||
| 	_tf_ssb_rpc_start_delete_blobs(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); | ||||
| 	tf_free(delete); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data) | ||||
| static void _tf_ssb_rpc_start_delete_blobs_callback(tf_ssb_t* ssb, void* user_data) | ||||
| { | ||||
| 	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL); | ||||
| 	delete_t* delete = tf_malloc(sizeof(delete_t)); | ||||
| 	*delete = (delete_t) { 0 }; | ||||
| 	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, delete); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) | ||||
| { | ||||
| 	tf_printf("will delete more blobs in %d ms\n", delay_ms); | ||||
| 	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_callback, NULL); | ||||
| 	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_blobs_callback, NULL); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data) | ||||
| { | ||||
| 	delete_t* delete = user_data; | ||||
| 	if (!_get_global_setting_bool(ssb, "delete_stale_feeds", false)) | ||||
| 	{ | ||||
| 		return; | ||||
| 	} | ||||
| 	int64_t start_ns = uv_hrtime(); | ||||
| 	int replication_hops = (int)_get_global_setting_int64(ssb, "replication_hops", 2); | ||||
| 	const char** identities = tf_ssb_db_get_all_visible_identities(ssb, replication_hops); | ||||
|  | ||||
| 	JSMallocFunctions funcs = { 0 }; | ||||
| 	tf_get_js_malloc_functions(&funcs); | ||||
| 	JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); | ||||
| 	JSContext* context = JS_NewContext(runtime); | ||||
| 	JSValue array = JS_NewArray(context); | ||||
| 	for (int i = 0; identities[i]; i++) | ||||
| 	{ | ||||
| 		JS_SetPropertyUint32(context, array, i, JS_NewString(context, identities[i])); | ||||
| 	} | ||||
| 	tf_free(identities); | ||||
|  | ||||
| 	JSValue json = JS_JSONStringify(context, array, JS_NULL, JS_NULL); | ||||
| 	const char* arg = JS_ToCString(context, json); | ||||
| 	JS_FreeValue(context, json); | ||||
| 	JS_FreeValue(context, array); | ||||
|  | ||||
| 	sqlite3* db = tf_ssb_acquire_db_writer(ssb); | ||||
| 	sqlite3_stmt* statement; | ||||
| 	if (sqlite3_prepare(db, | ||||
| 			"DELETE FROM messages WHERE author IN (" | ||||
| 			"  SELECT author FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) GROUP BY author LIMIT 1" | ||||
| 			") RETURNING author", | ||||
| 			-1, &statement, NULL) == SQLITE_OK) | ||||
| 	{ | ||||
| 		int status = SQLITE_OK; | ||||
| 		bool printed = false; | ||||
| 		if (sqlite3_bind_text(statement, 1, arg, -1, NULL) == SQLITE_OK) | ||||
| 		{ | ||||
| 			while ((status = sqlite3_step(statement)) == SQLITE_ROW) | ||||
| 			{ | ||||
| 				if (!printed) | ||||
| 				{ | ||||
| 					tf_printf("deleting %s\n", sqlite3_column_text(statement, 0)); | ||||
| 					printed = true; | ||||
| 					delete->deleted++; | ||||
| 				} | ||||
| 			} | ||||
| 			if (status != SQLITE_DONE) | ||||
| 			{ | ||||
| 				tf_printf("deleting feeds: %s\n", sqlite3_errmsg(db)); | ||||
| 			} | ||||
| 		} | ||||
| 		sqlite3_finalize(statement); | ||||
| 	} | ||||
| 	tf_ssb_release_db_writer(ssb, db); | ||||
|  | ||||
| 	JS_FreeCString(context, arg); | ||||
|  | ||||
| 	JS_FreeContext(context); | ||||
| 	JS_FreeRuntime(runtime); | ||||
|  | ||||
| 	delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; | ||||
| 	tf_printf("Deleted %d feeds in %d ms.\n", delete->deleted, (int)delete->duration_ms); | ||||
| 	_tf_ssb_rpc_checkpoint(ssb); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_delete_feeds_after_work(tf_ssb_t* ssb, int status, void* user_data) | ||||
| { | ||||
| 	delete_t* delete = user_data; | ||||
| 	_tf_ssb_rpc_start_delete_feeds(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); | ||||
| 	tf_free(delete); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_start_delete_feeds_callback(tf_ssb_t* ssb, void* user_data) | ||||
| { | ||||
| 	delete_t* delete = tf_malloc(sizeof(delete_t)); | ||||
| 	*delete = (delete_t) { 0 }; | ||||
| 	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_feeds_work, _tf_ssb_rpc_delete_feeds_after_work, delete); | ||||
| } | ||||
|  | ||||
| static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms) | ||||
| { | ||||
| 	tf_printf("will delete more feeds in %d ms\n", delay_ms); | ||||
| 	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_feeds_callback, NULL); | ||||
| } | ||||
|  | ||||
| void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb) | ||||
| { | ||||
| 	_tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000); | ||||
| 	_tf_ssb_rpc_start_delete_feeds(ssb, 25 * 1000); | ||||
| } | ||||
|  | ||||
| typedef struct _peers_exchange_t | ||||
|   | ||||
| @@ -350,7 +350,14 @@ static JSValue _util_defaultGlobalSettings(JSContext* context, JSValueConst this | ||||
| 		{ .name = "room_name", .type = "string", .description = "Name of the room.", .default_value = JS_NewString(context, "tilde friends tunnel") }, | ||||
| 		{ .name = "seeds_host", .type = "string", .description = "Hostname for seed connections.", .default_value = JS_NewString(context, "seeds.tildefriends.net") }, | ||||
| 		{ .name = "account_registration", .type = "boolean", .description = "Allow registration of new accounts.", .default_value = JS_TRUE }, | ||||
| 		{ .name = "replication_hops", .type = "integer", .description = "Number of hops to replicate (1 = direct follows, 2 = follows of follows, etc.).", .default_value = JS_NewInt32(context, 2) }, | ||||
| 		{ .name = "replication_hops", | ||||
| 			.type = "integer", | ||||
| 			.description = "Number of hops to replicate (1 = direct follows, 2 = follows of follows, etc.).", | ||||
| 			.default_value = JS_NewInt32(context, 2) }, | ||||
| 		{ .name = "delete_stale_feeds", | ||||
| 			.type = "boolean", | ||||
| 			.description = "Periodically delete feeds that visible from local accounts and related follows.", | ||||
| 			.default_value = JS_FALSE }, | ||||
| 	}; | ||||
|  | ||||
| 	JSValue settings = JS_NewObject(context); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user