Move all but the deleting off of the writer when deleting blobs and messages. Trying to eliminate a period of unresponsiveness near launch.
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 32m41s

This commit is contained in:
2025-06-15 08:46:07 -04:00
parent e54312d3b8
commit 53cba2d7e4

View File

@ -1482,51 +1482,86 @@ static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
return;
}
int64_t start_ns = uv_hrtime();
db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
int64_t now = (int64_t)time(NULL) * 1000ULL;
int64_t timestamp = now - age * 1000ULL;
const int k_limit = 128;
int deleted = 0;
if (sqlite3_prepare_v2(db, "DELETE FROM blob_wants_cache WHERE source IS NULL and timestamp < ?1", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("Deleting stale blob wants cache entries: %s.\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
sqlite3_stmt* statement;
char** ids = NULL;
int ids_count = 0;
db = tf_ssb_acquire_db_reader(ssb);
if (sqlite3_prepare_v2(db,
"DELETE FROM blobs WHERE blobs.id IN ("
" SELECT blobs.id FROM blobs "
" JOIN messages_refs ON blobs.id = messages_refs.ref "
" JOIN messages ON messages.id = messages_refs.message "
" WHERE blobs.created < ?1 / 1000 "
" GROUP BY blobs.id HAVING MAX(messages.timestamp) < ?1 LIMIT ?2)",
"SELECT blobs.id FROM blobs "
"JOIN messages_refs ON blobs.id = messages_refs.ref "
"JOIN messages ON messages.id = messages_refs.message "
"WHERE blobs.created < ?1 / 1000 "
"GROUP BY blobs.id HAVING MAX(messages.timestamp) < ?1 LIMIT ?2",
-1, &statement, NULL) == SQLITE_OK)
{
const int k_limit = 128;
if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK)
{
int r = sqlite3_step(statement);
int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
{
ids = tf_realloc(ids, sizeof(char*) * (ids_count + 1));
ids[ids_count++] = tf_strdup((const char*)sqlite3_column_text(statement, 0));
}
if (r != SQLITE_DONE)
{
tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db));
}
else
{
tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db));
}
deleted = sqlite3_changes(db);
}
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
}
tf_ssb_release_db_writer(ssb, db);
tf_ssb_release_db_reader(ssb, db);
int deleted = 0;
if (ids_count)
{
db = tf_ssb_acquire_db_writer(ssb);
if (sqlite3_prepare_v2(db, "DELETE FROM blob_wants_cache WHERE source IS NULL and timestamp < ?1", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("Deleting stale blob wants cache entries: %s.\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
if (sqlite3_prepare_v2(db, "DELETE FROM blobs WHERE blobs.id = ?", -1, &statement, NULL) == SQLITE_OK)
{
for (int i = 0; i < ids_count; i++)
{
if (sqlite3_bind_text(statement, 1, ids[i], -1, NULL) == SQLITE_OK)
{
int r = sqlite3_step(statement);
if (r != SQLITE_DONE)
{
tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db));
}
else
{
deleted += sqlite3_changes(db);
}
}
sqlite3_reset(statement);
tf_free(ids[i]);
}
sqlite3_finalize(statement);
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
}
tf_ssb_release_db_writer(ssb, db);
tf_free(ids);
}
delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL;
tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)delete->duration_ms);
}
@ -1584,28 +1619,57 @@ static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data)
JS_FreeValue(context, json);
JS_FreeValue(context, array);
db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare_v2(db,
"DELETE FROM messages WHERE id IN ("
" SELECT id FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) ORDER BY rowid DESC LIMIT 1024"
")",
-1, &statement, NULL) == SQLITE_OK)
char** ids = NULL;
int ids_count = 0;
db = tf_ssb_acquire_db_reader(ssb);
if (sqlite3_prepare_v2(db, "SELECT id FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) ORDER BY rowid DESC LIMIT 1024", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, arg, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
{
ids = tf_realloc(ids, sizeof(char*) * (ids_count + 1));
ids[ids_count++] = tf_strdup((const char*)sqlite3_column_text(statement, 0));
}
if (r != SQLITE_DONE)
{
tf_printf("deleting messages: %s\n", sqlite3_errmsg(db));
}
else
{
delete->deleted += sqlite3_changes(db);
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
tf_ssb_release_db_reader(ssb, db);
if (ids_count)
{
db = tf_ssb_acquire_db_writer(ssb);
if (sqlite3_prepare_v2(db, "DELETE FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK)
{
for (int i = 0; i < ids_count; i++)
{
if (sqlite3_bind_text(statement, 1, ids[i], -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("deleting messages: %s\n", sqlite3_errmsg(db));
}
else
{
delete->deleted++;
}
}
sqlite3_reset(statement);
tf_free(ids[i]);
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
tf_free(ids);
}
JS_FreeCString(context, arg);