#include "ssb.rpc.h" #include "ssb.h" #include "trace.h" #include #include #include #include #include #include typedef struct _tf_ssb_blob_wants_t tf_ssb_blob_wants_t; typedef struct _tf_ssb_blob_wants_t { tf_ssb_rpc_t* rpc; tf_ssb_connection_t* connection; int32_t request_number; int64_t rowid; tf_ssb_blob_wants_t* next; int open_count; } tf_ssb_blob_wants_t; typedef struct _tf_ssb_rpc_t { tf_ssb_blob_wants_t* wants; uv_async_t wants_async; } tf_ssb_rpc_t; const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth); static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { JSContext* context = tf_ssb_connection_get_context(connection); sqlite3* db = tf_ssb_connection_get_db(connection); sqlite3_stmt* statement; JSValue args_array = JS_GetPropertyStr(context, args, "args"); JSValue blob_id_value = JS_GetPropertyUint32(context, args_array, 0); const char* blob_id = JS_ToCString(context, blob_id_value); bool have = false; if (sqlite3_prepare(db, "SELECT 1 FROM blobs WHERE id = $1", -1, &statement, NULL) == SQLITE_OK) { have = sqlite3_bind_text(statement, 1, blob_id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW; sqlite3_finalize(statement); } JS_FreeCString(context, blob_id); JS_FreeValue(context, blob_id_value); JS_FreeValue(context, args_array); uint8_t send_flags = (flags & k_ssb_rpc_flag_stream) | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error; const char* result = have ? "true" : "false"; tf_ssb_connection_rpc_send(connection, send_flags, -request_number, (const uint8_t*)result, strlen(result), NULL, NULL); } static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue blob_ids = JS_GetPropertyStr(context, args, "args"); JSValue length_value = JS_GetPropertyStr(context, blob_ids, "length"); int32_t length = 0; JS_ToInt32(context, &length, length_value); JS_FreeValue(context, length_value); for (int i = 0; i < length; i++) { JSValue blob_id_value = JS_GetPropertyUint32(context, blob_ids, i); const char* blob_id = JS_ToCString(context, blob_id_value); uint8_t* blob = NULL; size_t blob_size = 0; if (tf_ssb_blob_get(ssb, blob_id, &blob, &blob_size)) { static const size_t k_block_size = 64 * 1024; for (size_t offset = 0; offset < blob_size; offset += k_block_size) { size_t block_size = offset + k_block_size < blob_size ? k_block_size : (blob_size - offset); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, blob, block_size, NULL, NULL); } free(blob); } JS_FreeCString(context, blob_id); JS_FreeValue(context, blob_id_value); } JS_FreeValue(context, blob_ids); } typedef struct _tf_ssb_connection_blobs_get_t { char blob_id[BLOB_ID_LEN]; size_t n; size_t size; crypto_hash_sha256_state hash; uint8_t data[]; } tf_ssb_blobs_get_t; static void _tf_ssb_connection_on_rpc_blobs_get_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_blobs_get_t* get = user_data; if (get) { if (get->n + size <= get->size) { memcpy(get->data + get->n, message, size); crypto_hash_sha256_update(&get->hash, message, size); get->n += size; } printf("received %zd / %zd for %s\n", get->n, get->size, get->blob_id); if (get->n == get->size) { uint8_t hash[crypto_hash_sha256_BYTES]; crypto_hash_sha256_final(&get->hash, hash); char hash64[256]; base64c_encode(hash, sizeof(hash), (uint8_t*)hash64, sizeof(hash64)); char id[512]; snprintf(id, sizeof(id), "&%s.sha256", hash64); if (strcmp(id, get->blob_id) == 0) { if (tf_ssb_blob_store(ssb, get->data, get->size, id, sizeof(id))) { printf("stored blob %s\n", get->blob_id); } else { printf("failed to store %s\n", get->blob_id); } } else { printf("blob does not match id %s vs. %s\n", id, get->blob_id); } tf_ssb_connection_remove_request(connection, -request_number); free(get); } } } void tf_ssb_rpc_send_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { JSContext* context = tf_ssb_connection_get_context(connection); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JSValue nameval = JS_NewArray(context); JS_SetPropertyUint32(context, nameval, 0, JS_NewString(context, "blobs")); JS_SetPropertyUint32(context, nameval, 1, JS_NewString(context, "get")); JS_SetPropertyStr(context, message, "name", nameval); JSValue argsval = JS_NewArray(context); JS_SetPropertyUint32(context, argsval, 0, JS_NewString(context, blob_id)); JS_SetPropertyStr(context, message, "args", argsval); JSValue str = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t len; const char* cstr = JS_ToCStringLen(context, &len, str); tf_ssb_blobs_get_t* get = malloc(sizeof(tf_ssb_blobs_get_t) + size); *get = (tf_ssb_blobs_get_t) { .size = size }; snprintf(get->blob_id, sizeof(get->blob_id), "%s", blob_id); crypto_hash_sha256_init(&get->hash); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), (const uint8_t*)cstr, len, _tf_ssb_connection_on_rpc_blobs_get_response, get); JS_FreeCString(context, cstr); JS_FreeValue(context, str); JS_FreeValue(context, message); } static void _tf_ssb_connection_on_rpc_blobs_createWants_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSPropertyEnum* ptab; uint32_t plen; JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK); for (uint32_t i = 0; i < plen; ++i) { JSPropertyDescriptor desc; JSValue key_value = JS_NULL; if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) { key_value = desc.value; } int32_t size; if (JS_ToInt32(context, &size, key_value) == 0) { JSValue key = JS_AtomToString(context, ptab[i].atom); const char* blob_id = JS_ToCString(context, key); if (size >= 0 && size < k_ssb_blob_bytes_max) { tf_ssb_rpc_send_blobs_get(connection, blob_id, size); } else if (size < 0) { size_t blob_size = 0; if (tf_ssb_blob_get(ssb, blob_id, NULL, &blob_size)) { JSValue size_response = JS_NewObject(context); JS_SetPropertyStr(context, size_response, blob_id, JS_NewInt64(context, blob_size)); JSValue jsonval = JS_JSONStringify(context, size_response, JS_NULL, JS_NULL); size_t len; const char* json = JS_ToCStringLen(context, &len, jsonval); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, request_number, (uint8_t*)json, len, NULL, NULL); JS_FreeCString(context, json); JS_FreeValue(context, jsonval); JS_FreeValue(context, size_response); } } JS_FreeCString(context, blob_id); JS_FreeValue(context, key); } JS_FreeValue(context, key_value); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } void tf_ssb_rpc_send_blobs_createWants(tf_ssb_connection_t* connection) { const char* k_createWants = "{\"name\": [\"blobs\", \"createWants\"], \"type\": \"source\", \"args\": []}"; tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), (const uint8_t*)k_createWants, strlen(k_createWants), _tf_ssb_connection_on_rpc_blobs_createWants_response, NULL); } static void _tf_ssb_rpc_remove_wants(tf_ssb_rpc_t* rpc, tf_ssb_connection_t* connection) { for (tf_ssb_blob_wants_t** it = &rpc->wants; *it; it = &(*it)->next) { if ((*it)->connection == connection) { tf_ssb_blob_wants_t* wants = *it; *it = wants->next; free(wants); return; } } } static void _tf_ssb_blob_wants_update(tf_ssb_blob_wants_t* wants) { static const int k_messages_per_query = 16; sqlite3* db = tf_ssb_connection_get_db(wants->connection); sqlite3_stmt* statement; if (wants->rowid <= 0) { if (sqlite3_prepare(db, "SELECT MAX(messages.rowid) FROM messages", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { wants->rowid = sqlite3_column_int64(statement, 0); } sqlite3_finalize(statement); } } if (sqlite3_prepare(db, "SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json " "LEFT OUTER JOIN blobs ON json.value = blobs.id " "WHERE " " messages.rowid <= ?1 AND messages.rowid > ?2 AND " " json.value LIKE '&%%.sha256' AND " " length(json.value) = ?3 AND " " blobs.content IS NULL", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, wants->rowid) == SQLITE_OK && sqlite3_bind_int64(statement, 2, wants->rowid - k_messages_per_query) == SQLITE_OK && sqlite3_bind_int64(statement, 3, BLOB_ID_LEN - 1) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { JSContext* context = tf_ssb_connection_get_context(wants->connection); JSValue want = JS_NewObject(context); JS_SetPropertyStr(context, want, (const char*)sqlite3_column_text(statement, 0), JS_NewInt32(context, -1)); JSValue jsonval = JS_JSONStringify(context, want, JS_NULL, JS_NULL); size_t len; const char* json = JS_ToCStringLen(context, &len, jsonval); ++wants->open_count; tf_ssb_connection_rpc_send(wants->connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -wants->request_number, (const uint8_t*)json, len, NULL, NULL); JS_FreeCString(context, json); JS_FreeValue(context, jsonval); JS_FreeValue(context, want); } wants->rowid -= k_messages_per_query; } else { printf("bind failed: %s\n", sqlite3_errmsg(db)); } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } if (wants->rowid <= 0) { tf_ssb_connection_rpc_send(wants->connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -wants->request_number, (const uint8_t*)"{}", 2, NULL, NULL); } else { uv_async_send(&wants->rpc->wants_async); } } static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_rpc_t* rpc = user_data; tf_ssb_blob_wants_t* wants = malloc(sizeof(tf_ssb_blob_wants_t)); *wants = (tf_ssb_blob_wants_t) { .rpc = rpc, .connection = connection, .request_number = request_number, .rowid = -1, .next = rpc->wants, }; rpc->wants = wants; _tf_ssb_blob_wants_update(wants); } static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { JSContext* context = tf_ssb_connection_get_context(connection); sqlite3* db = tf_ssb_connection_get_db(connection); JSValue streamArgs = JS_GetPropertyStr(context, args, "args"); JSValue obj = JS_GetPropertyUint32(context, streamArgs, 0); JSValue idval = JS_GetPropertyStr(context, obj, "id"); const char* author = JS_ToCString(context, idval); int64_t seq = 0; sqlite3_stmt* statement; JSValue seqval = JS_GetPropertyStr(context, obj, "seq"); JS_ToInt64(context, &seq, seqval); JS_FreeValue(context, seqval); const char* query = "SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = $1 AND sequence >= $2 ORDER BY sequence"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, seq) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { JSValue message = JS_NewObject(context); const char* previous = (const char*)sqlite3_column_text(statement, 0); JS_SetPropertyStr(context, message, "previous", previous ? JS_NewString(context, previous) : JS_NULL); JS_SetPropertyStr(context, message, "author", JS_NewString(context, author)); JS_SetPropertyStr(context, message, "sequence", JS_NewInt64(context, sqlite3_column_int64(statement, 1))); JS_SetPropertyStr(context, message, "timestamp", JS_NewInt64(context, sqlite3_column_int64(statement, 2))); JS_SetPropertyStr(context, message, "hash", JS_NewString(context, (const char*)sqlite3_column_text(statement, 3))); const char* contentstr = (const char*)sqlite3_column_text(statement, 4); JSValue content = JS_ParseJSON(context, contentstr, strlen(contentstr), NULL); JS_SetPropertyStr(context, message, "content", content); JS_SetPropertyStr(context, message, "signature", JS_NewString(context, (const char*)sqlite3_column_text(statement, 5))); JSValue jsonval = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t len; const char* json = JS_ToCStringLen(context, &len, jsonval); if (tf_ssb_verify_and_strip_signature(context, message, NULL, 0)) { tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)json, len, NULL, NULL); } else { printf("message signature is invalid\n"); } JS_FreeCString(context, json); JS_FreeValue(context, jsonval); JS_FreeValue(context, message); } } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_connection_rpc_send(connection, flags | k_ssb_rpc_flag_end_error, -request_number, (const uint8_t*)"true", 4, NULL, NULL); JS_FreeValue(context, obj); JS_FreeCString(context, author); JS_FreeValue(context, idval); JS_FreeValue(context, streamArgs); } static void _tf_ssb_connection_on_rpc_createHistoryStream_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue val, const uint8_t* message, size_t size, void* user_data) { tf_ssb_rpc_t* rpc = user_data; JSContext* context = tf_ssb_connection_get_context(connection); char signature[crypto_sign_BYTES + 128]; char id[crypto_hash_sha256_BYTES * 2 + 1]; tf_ssb_calculate_message_id(context, val, id, sizeof(id)); if (tf_ssb_verify_and_strip_signature(context, val, signature, sizeof(signature))) { tf_ssb_store_message(tf_ssb_connection_get_ssb(connection), context, id, val, signature); } else { printf("failed to verify message\n"); } uv_async_send(&rpc->wants_async); } void tf_ssb_rpc_send_createHistoryStream(tf_ssb_connection_t* connection, const char* id) { JSContext* context = tf_ssb_connection_get_context(connection); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JSValue nameval = JS_NewArray(context); JS_SetPropertyUint32(context, nameval, 0, JS_NewString(context, "createHistoryStream")); JS_SetPropertyStr(context, message, "name", nameval); JSValue obj = JS_NewObject(context); JS_SetPropertyStr(context, obj, "id", JS_NewString(context, id)); JS_SetPropertyStr(context, obj, "keys", JS_FALSE); int64_t sequence = 0; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (tf_ssb_get_latest_message_by_author(ssb, id, &sequence, NULL, 0)) { JS_SetPropertyStr(context, obj, "seq", JS_NewInt64(context, sequence)); } JSValue argsval = JS_NewArray(context); JS_SetPropertyUint32(context, argsval, 0, obj); JS_SetPropertyStr(context, message, "args", argsval); JSValue str = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t len; const char* cstr = JS_ToCStringLen(context, &len, str); tf_ssb_rpc_t* rpc = tf_ssb_get_rpc(ssb); tf_ssb_connection_rpc_send(connection, 0xa, tf_ssb_connection_next_request_number(connection), (const uint8_t*)cstr, len, _tf_ssb_connection_on_rpc_createHistoryStream_response, rpc); JS_FreeCString(context, cstr); JS_FreeValue(context, str); JS_FreeValue(context, message); } static void _tf_ssb_rpc_on_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { if (change != k_tf_ssb_change_connect) { return; } char id[k_id_base64_len]; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { const char** ids = tf_ssb_get_following_deep(ssb, (const char*[]) { id, NULL }, 2); for (int i = 0; ids && ids[i]; i++) { tf_ssb_rpc_send_createHistoryStream(connection, ids[i]); } free(ids); } } static void _tf_ssb_add_id(const char*** results, int* results_count, int* results_capacity, const char* id) { for (int i = 0; i < *results_count; i++) { if (strcmp((*results)[i], id) == 0) { return; } } if (*results_count + 1 > *results_capacity) { int old_capacity = *results_capacity; *results_capacity = (*results_capacity + 1) * 2; *results = realloc(*results, sizeof(const char*) * (*results_capacity)); memset(*results + *results_count, 0, sizeof(const char*) * (*results_capacity - old_capacity)); } (*results)[(*results_count)++] = strdup(id); } const char** tf_ssb_get_following(tf_ssb_t* ssb, const char* id) { sqlite3* db = tf_ssb_get_db(ssb); JSContext* context = tf_ssb_get_context(ssb); const int k_version = 0; int64_t rowid = 0; const char** results = NULL; int results_count = 0; int results_capacity = 0; sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT value FROM properties WHERE id = ? AND key = (? || ':following')", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, "core", -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { JSValue cache = JS_ParseJSON(context, (const char*)sqlite3_column_text(statement, 0), sqlite3_column_bytes(statement, 0), NULL); JSValue version_value = JS_GetPropertyStr(context, cache, "version"); int32_t version = 0; JS_ToInt32(context, &version, version_value); JS_FreeValue(context, version_value); if (version == k_version) { JSValue rowid_value = JS_GetPropertyStr(context, cache, "rowid"); JS_ToInt64(context, &rowid, rowid_value); JS_FreeValue(context, rowid_value); JSValue ids = JS_GetPropertyStr(context, cache, "following"); JSValue length_value = JS_GetPropertyStr(context, ids, "length"); int32_t length = 0; JS_ToInt32(context, &length, length_value); for (int i = 0; i < length; i++) { JSValue id = JS_GetPropertyUint32(context, ids, i); const char* id_string = JS_ToCString(context, id); _tf_ssb_add_id(&results, &results_count, &results_capacity, id_string); JS_FreeCString(context, id_string); JS_FreeValue(context, id); } JS_FreeValue(context, length_value); JS_FreeValue(context, ids); } JS_FreeValue(context, cache); } sqlite3_finalize(statement); } int64_t loaded_rowid = rowid; JSValue cache = JS_UNDEFINED; JS_FreeValue(context, cache); if (sqlite3_prepare(db, "SELECT " "json_extract(content, '$.contact'), " "json_extract(content, '$.following'), " "rowid " "FROM messages " "WHERE author = ? AND " "rowid > ? AND " "json_extract(content, '$.type') = 'contact' UNION " "SELECT NULL, NULL, MAX(rowid) FROM messages " "ORDER BY rowid", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, rowid) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { const char* contact = (const char*)sqlite3_column_text(statement, 0); if (sqlite3_column_type(statement, 0) != SQLITE_NULL) { if (sqlite3_column_int(statement, 1)) { _tf_ssb_add_id(&results, &results_count, &results_capacity, contact); } else { for (int i = 0; i < results_count; i++) { if (strcmp(results[i], contact) == 0) { free((void*)results[i]); results[i] = results[--results_count]; } } } } rowid = sqlite3_column_int64(statement, 2); } } sqlite3_finalize(statement); } if (rowid != loaded_rowid) { JSValue cache = JS_NewObject(context); JS_SetPropertyStr(context, cache, "version", JS_NewInt32(context, k_version)); JS_SetPropertyStr(context, cache, "rowid", JS_NewInt64(context, rowid)); JSValue ids = JS_NewArray(context); for (int i = 0; i < results_count; i++) { JS_SetPropertyUint32(context, ids, i, JS_NewString(context, results[i])); } JS_SetPropertyStr(context, cache, "following", ids); JSValue json_value = JS_JSONStringify(context, cache, JS_NULL, JS_NULL); const char* json = JS_ToCString(context, json_value); JS_FreeValue(context, json_value); JS_FreeValue(context, cache); if (sqlite3_prepare(db, "INSERT OR REPLACE INTO properties (id, key, value) VALUES (?, ? || ':following', ?)", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, "core", -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, id, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 3, json, -1, NULL) == SQLITE_OK) { sqlite3_step(statement); } sqlite3_finalize(statement); } JS_FreeCString(context, json); } size_t size = (results_count + 1) * sizeof(const char*); for (int i = 0; i < results_count; i++) { size += strlen(results[i]) + 1; } char** result = malloc(size); char* p = (char*)result + (results_count + 1) * sizeof(const char*); for (int i = 0; i < results_count; i++) { result[i] = p; size_t length = strlen(results[i]) + 1; memcpy(p, results[i], length); free((void*)results[i]); p += length; } result[results_count] = NULL; free((void*)results); return (const char**)result; } const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth) { const char** results = NULL; int results_count = 0; int results_capacity = 0; for (int i = 0; ids[i]; i++) { _tf_ssb_add_id(&results, &results_count, &results_capacity, ids[i]); } for (int i = 0; ids[i]; i++) { const char** following = tf_ssb_get_following(ssb, ids[i]); for (int j = 0; following[j]; j++) { _tf_ssb_add_id(&results, &results_count, &results_capacity, following[j]); } free(following); } size_t size = (results_count + 1) * sizeof(const char*); for (int i = 0; i < results_count; i++) { size += strlen(results[i]) + 1; } char** result = malloc(size); char* p = (char*)result + (results_count + 1) * sizeof(const char*); for (int i = 0; i < results_count; i++) { result[i] = p; size_t length = strlen(results[i]) + 1; memcpy(p, results[i], length); free((void*)results[i]); p += length; } result[results_count] = NULL; free((void*)results); if (depth > 1) { const char** r = tf_ssb_get_following_deep(ssb, (const char**)result, depth - 1); free(result); result = (char**)r; } return (const char**)result; } static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { tf_ssb_rpc_t* rpc = user_data; if (change == k_tf_ssb_change_connect) { tf_ssb_rpc_send_blobs_createWants(connection); } else if (change == k_tf_ssb_change_remove) { _tf_ssb_rpc_remove_wants(rpc, connection); } } static void _tf_ssb_rpc_wants_async(uv_async_t* async) { tf_ssb_rpc_t* rpc = async->data; tf_ssb_blob_wants_t* it = rpc->wants; if (it) { rpc->wants = it->next; it->next = NULL; if (rpc->wants) { for (tf_ssb_blob_wants_t* tail = rpc->wants; tail; tail = tail->next) { if (!tail->next) { tail->next = it; break; } } } else { rpc->wants = it; } _tf_ssb_blob_wants_update(it); } } tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb) { tf_ssb_rpc_t* rpc = malloc(sizeof(tf_ssb_rpc_t)); *rpc = (tf_ssb_rpc_t) { .wants_async.data = rpc, }; tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc); uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); uv_unref((uv_handle_t*)&rpc->wants_async); return rpc; } static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle) { free(handle->data); } void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc) { uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed); }