diff --git a/core/ssb.js b/core/ssb.js index 00dbcc74..05c67cc6 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -5,54 +5,6 @@ let g_attendants = {}; const k_use_create_history_stream = false; const k_blobs_concurrent_target = 8; -function following(db, id) { - var o = db.get(id + ":following"); - const k_version = 5; - var f = o ? JSON.parse(o) : o; - if (!f || f.version != k_version) { - f = {users: [], sequence: 0, version: k_version}; - } - f.users = new Set(f.users); - ssb.sqlStream( - "SELECT "+ - " sequence, "+ - " json_extract(content, '$.contact') AS contact, "+ - " json_extract(content, '$.following') AS following "+ - "FROM messages "+ - "WHERE "+ - " author = ?1 AND "+ - " sequence > ?2 AND "+ - " json_extract(content, '$.type') = 'contact' "+ - "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ - "ORDER BY sequence", - [id, f.sequence], - function(row) { - if (row.following) { - f.users.add(row.contact); - } else { - f.users.delete(row.contact); - } - f.sequence = row.sequence; - }); - f.users = Array.from(f.users).sort(); - var j = JSON.stringify(f); - if (o != j) { - db.set(id + ":following", j); - } - return f.users; -} - -function followingDeep(db, seed_ids, depth) { - if (depth <= 0) { - return seed_ids; - } - var f = seed_ids.map(x => following(db, x)); - var ids = [].concat(...f); - var x = followingDeep(db, [...new Set(ids)].sort(), depth - 1); - x = [...new Set([].concat(...x, ...seed_ids))].sort(); - return x; -} - function get_latest_sequence_for_author(author) { var sequence = 0; ssb.sqlStream( @@ -140,7 +92,7 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn if (k_use_create_history_stream) { connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage); var identities = ssb.getAllIdentities(); - followingDeep(g_database, identities, 2).then(function(ids) { + ssb.followingDeep(identities, 2).then(function(ids) { for (let id of ids) { if (identities.indexOf(id) != -1) { continue; @@ -226,7 +178,7 @@ function ebtReplicateSendClock(request, have) { var identities = ssb.getAllIdentities(); var message = {}; var last_sent = request.connection.sent_clock || {}; - var ids = followingDeep(g_database, identities, 2).concat([request.connection.id]); + var ids = ssb.followingDeep(identities, 2).concat([request.connection.id]); if (!Object.keys(last_sent).length) { for (let id of ids) { message[id] = get_latest_sequence_for_author(id); diff --git a/src/ssb.db.c b/src/ssb.db.c index d00bff13..6066ef1d 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -973,6 +973,129 @@ bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const c return success; } +typedef struct _following_t following_t; + +typedef struct _following_t +{ + char id[k_id_base64_len]; + following_t** following; + following_t** blocking; + int following_count; + int blocking_count; + int depth; +} following_t; + +static int _following_compare(const void* a, const void* b) +{ + const char* ida = a; + const following_t* const* followingb = b; + return strcmp(ida, (*followingb)->id); +} + +static void _add_following_entry(following_t*** list, int* count, following_t* add) +{ + int index = tf_util_insert_index(add->id, *list, *count, sizeof(following_t*), _following_compare); + if (index >= *count || strcmp(add->id, (*list)[index]->id) == 0) + { + *list = tf_resize_vec(*list, sizeof(**list) * (*count + 1)); + if (*count - index) + { + memmove(*list + index + 1, *list + index, sizeof(following_t*) * (*count - index)); + } + (*list)[index] = add; + } +} + +static following_t* _get_following(tf_ssb_t* ssb, const char* id, following_t*** following, int* following_count, int depth, int max_depth) +{ + int index = tf_util_insert_index(id, *following, *following_count, sizeof(following_t*), _following_compare); + following_t* entry = NULL; + if (index < *following_count && strcmp(id, (*following)[index]->id) == 0) + { + entry = (*following)[index]; + } + else + { + *following = tf_resize_vec(*following, sizeof(*following) * (*following_count + 1)); + if (*following_count - index) + { + memmove(*following + index + 1, *following + index, sizeof(following_t*) * (*following_count - index)); + } + entry = tf_malloc(sizeof(following_t)); + (*following)[index] = entry; + (*following_count)++; + *entry = (following_t) { 0 }; + snprintf(entry->id, sizeof(entry->id), "%s", id); + entry->depth = depth; + + if (depth < max_depth) + { + sqlite3* db = tf_ssb_get_db(ssb); + sqlite3_stmt* statement = NULL; + if (sqlite3_prepare(db, "SELECT json_extract(content, '$.contact'), json_extract(content, '$.following'), json_extract(content, '$.blocking') FROM messages WHERE author = ? AND json_extract(content, '$.type') = 'contact' ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) + { + if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK) + { + while (sqlite3_step(statement) == SQLITE_ROW) + { + const char* contact = (const char*)sqlite3_column_text(statement, 0); + if (sqlite3_column_type(statement, 1) != SQLITE_NULL) + { + bool is_following = sqlite3_column_int(statement, 1); + following_t* next = _get_following(ssb, contact, following, following_count, depth + 1, max_depth); + if (is_following) + { + _add_following_entry(&entry->following, &entry->following_count, next); + } + } + if (sqlite3_column_type(statement, 2) != SQLITE_NULL) + { + bool is_blocking = sqlite3_column_int(statement, 2); + following_t* next = _get_following(ssb, contact, following, following_count, depth + 1, 0 /* don't dig deeper into blocked users */); + if (is_blocking) + { + _add_following_entry(&entry->blocking, &entry->blocking_count, next); + } + } + } + } + sqlite3_finalize(statement); + } + } + } + return entry; +} + +const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth) +{ + following_t** following = NULL; + int following_count = 0; + for (int i = 0; i < count; i++) + { + _get_following(ssb, ids[i], &following, &following_count, 0, depth); + } + + char** result = tf_malloc(sizeof(char*) * (following_count + 1) + k_id_base64_len * following_count); + char* result_ids = (char*)result + sizeof(char*) * (following_count + 1); + + for (int i = 0; i < following_count; i++) + { + result[i] = result_ids + k_id_base64_len * i; + snprintf(result[i], k_id_base64_len, "%s", following[i]->id); + } + result[following_count] = NULL; + + for (int i = 0; i < following_count; i++) + { + tf_free(following[i]->following); + tf_free(following[i]->blocking); + tf_free(following[i]); + } + tf_free(following); + + return (const char**)result; +} + static void _test_private(sqlite3* db, const uint8_t* private_key) { sqlite3_stmt* statement = NULL; diff --git a/src/ssb.db.h b/src/ssb.db.h index 884cb2a1..9493b39b 100644 --- a/src/ssb.db.h +++ b/src/ssb.db.h @@ -26,4 +26,6 @@ void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)( void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data); bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size); +const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth); + void tf_ssb_db_private(sqlite3* db); diff --git a/src/ssb.js.c b/src/ssb.js.c index 5f1d0882..9f1b53ab 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -966,6 +966,45 @@ static JSValue _tf_ssb_connectionSendJson(JSContext* context, JSValueConst this_ return JS_NewInt32(context, request_number); } +static JSValue _tf_ssb_followingDeep(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + int depth = 2; + if (!JS_IsArray(context, argv[0])) + { + return JS_ThrowTypeError(context, "Expected argument 1 to be an array of ids."); + } + if (JS_ToInt32(context, &depth, argv[1])) + { + return JS_ThrowTypeError(context, "Could not convert argument 2 to integer."); + } + + int count = tf_util_get_length(context, argv[0]); + const char** ids = tf_malloc(count * sizeof(char*)); + for (int i = 0; i < count; i++) + { + JSValue id = JS_GetPropertyUint32(context, argv[0], i); + ids[i] = JS_ToCString(context, id); + JS_FreeValue(context, id); + } + + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + const char** following_deep = tf_ssb_db_following_deep(ssb, ids, count, depth); + JSValue result = JS_NewArray(context); + int index = 0; + for (const char** id = following_deep; *id; id++) + { + JS_SetPropertyUint32(context, result, index++, JS_NewString(context, *id)); + } + tf_free(following_deep); + + for (int i = 0; i < count; i++) + { + JS_FreeCString(context, ids[i]); + } + tf_free(ids); + return result; +} + void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) { JS_NewClassID(&_tf_ssb_classId); @@ -1005,6 +1044,7 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3)); + JS_SetPropertyStr(context, object, "followingDeep", JS_NewCFunction(context, _tf_ssb_followingDeep, "followingDeep", 2)); /* Should be trusted only. */ JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2));