forked from cory/tildefriends
What was ssb.rpc.c now lives in ssb.js.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3663 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
parent
3b4f0c1321
commit
059392df8e
11
src/ssb.c
11
src/ssb.c
@ -2,7 +2,6 @@
|
||||
|
||||
#include "ssb.connections.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.rpc.h"
|
||||
#include "trace.h"
|
||||
|
||||
#include <assert.h>
|
||||
@ -125,7 +124,6 @@ typedef struct _tf_ssb_t {
|
||||
tf_ssb_connection_t* connections;
|
||||
|
||||
tf_ssb_connections_t* connections_tracker;
|
||||
tf_ssb_rpc_t* rpc_state;
|
||||
|
||||
void (*broadcasts_changed)(tf_ssb_t* ssb, void* user_data);
|
||||
void* broadcasts_changed_user_data;
|
||||
@ -1347,7 +1345,6 @@ tf_ssb_t* tf_ssb_create(uv_loop_t* loop, JSContext* context, sqlite3* db, const
|
||||
_tf_ssb_save_keys(ssb);
|
||||
}
|
||||
|
||||
ssb->rpc_state = tf_ssb_rpc_create(ssb);
|
||||
ssb->connections_tracker = tf_ssb_connections_create(ssb);
|
||||
|
||||
return ssb;
|
||||
@ -1363,11 +1360,6 @@ uv_loop_t* tf_ssb_get_loop(tf_ssb_t* ssb)
|
||||
return ssb->loop;
|
||||
}
|
||||
|
||||
tf_ssb_rpc_t* tf_ssb_get_rpc(tf_ssb_t* ssb)
|
||||
{
|
||||
return ssb->rpc_state;
|
||||
}
|
||||
|
||||
void tf_ssb_generate_keys(tf_ssb_t* ssb)
|
||||
{
|
||||
crypto_sign_ed25519_keypair(ssb->pub, ssb->priv);
|
||||
@ -1407,9 +1399,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
|
||||
}
|
||||
}
|
||||
|
||||
tf_ssb_rpc_destroy(ssb->rpc_state);
|
||||
ssb->rpc_state = NULL;
|
||||
|
||||
if (ssb->broadcast_listener.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_listener)) {
|
||||
uv_close((uv_handle_t*)&ssb->broadcast_listener, _tf_ssb_on_handle_close);
|
||||
}
|
||||
|
@ -47,7 +47,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb);
|
||||
|
||||
sqlite3* tf_ssb_get_db(tf_ssb_t* ssb);
|
||||
uv_loop_t* tf_ssb_get_loop(tf_ssb_t* ssb);
|
||||
tf_ssb_rpc_t* tf_ssb_get_rpc(tf_ssb_t* ssb);
|
||||
|
||||
void tf_ssb_generate_keys(tf_ssb_t* ssb);
|
||||
|
||||
|
679
src/ssb.rpc.c
679
src/ssb.rpc.c
@ -1,679 +0,0 @@
|
||||
#include "ssb.rpc.h"
|
||||
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.h"
|
||||
#include "trace.h"
|
||||
|
||||
#include <base64c.h>
|
||||
#include <sodium/crypto_hash_sha256.h>
|
||||
#include <sodium/crypto_sign.h>
|
||||
#include <sqlite3.h>
|
||||
#include <uv.h>
|
||||
|
||||
#include <string.h>
|
||||
|
||||
typedef struct _tf_ssb_blob_wants_t tf_ssb_blob_wants_t;
|
||||
|
||||
typedef struct _tf_ssb_blob_wants_t
|
||||
{
|
||||
tf_ssb_rpc_t* rpc;
|
||||
tf_ssb_connection_t* connection;
|
||||
int32_t request_number;
|
||||
int64_t rowid;
|
||||
tf_ssb_blob_wants_t* next;
|
||||
int open_count;
|
||||
} tf_ssb_blob_wants_t;
|
||||
|
||||
typedef struct _tf_ssb_rpc_t
|
||||
{
|
||||
tf_ssb_blob_wants_t* wants;
|
||||
uv_async_t wants_async;
|
||||
} tf_ssb_rpc_t;
|
||||
|
||||
const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth);
|
||||
|
||||
/*static void _tf_ssb_rpc_blob_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
sqlite3* db = tf_ssb_connection_get_db(connection);
|
||||
sqlite3_stmt* statement;
|
||||
JSValue args_array = JS_GetPropertyStr(context, args, "args");
|
||||
JSValue blob_id_value = JS_GetPropertyUint32(context, args_array, 0);
|
||||
const char* blob_id = JS_ToCString(context, blob_id_value);
|
||||
bool have = false;
|
||||
|
||||
if (sqlite3_prepare(db, "SELECT 1 FROM blobs WHERE id = $1", -1, &statement, NULL) == SQLITE_OK) {
|
||||
have =
|
||||
sqlite3_bind_text(statement, 1, blob_id, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_step(statement) == SQLITE_ROW;
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
|
||||
JS_FreeCString(context, blob_id);
|
||||
JS_FreeValue(context, blob_id_value);
|
||||
JS_FreeValue(context, args_array);
|
||||
|
||||
uint8_t send_flags =
|
||||
(flags & k_ssb_rpc_flag_stream) |
|
||||
k_ssb_rpc_flag_json |
|
||||
k_ssb_rpc_flag_end_error;
|
||||
const char* result = have ? "true" : "false";
|
||||
tf_ssb_connection_rpc_send(connection, send_flags, -request_number, (const uint8_t*)result, strlen(result), NULL, NULL);
|
||||
}*/
|
||||
|
||||
/*static void _tf_ssb_rpc_blob_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
JSValue blob_ids = JS_GetPropertyStr(context, args, "args");
|
||||
JSValue length_value = JS_GetPropertyStr(context, blob_ids, "length");
|
||||
int32_t length = 0;
|
||||
JS_ToInt32(context, &length, length_value);
|
||||
JS_FreeValue(context, length_value);
|
||||
for (int i = 0; i < length; i++) {
|
||||
JSValue blob_id_value = JS_GetPropertyUint32(context, blob_ids, i);
|
||||
const char* blob_id = JS_ToCString(context, blob_id_value);
|
||||
|
||||
uint8_t* blob = NULL;
|
||||
size_t blob_size = 0;
|
||||
if (tf_ssb_db_blob_get(ssb, blob_id, &blob, &blob_size)) {
|
||||
static const size_t k_block_size = 64 * 1024;
|
||||
for (size_t offset = 0; offset < blob_size; offset += k_block_size) {
|
||||
size_t block_size = offset + k_block_size < blob_size ? k_block_size : (blob_size - offset);
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, blob, block_size, NULL, NULL);
|
||||
}
|
||||
free(blob);
|
||||
}
|
||||
|
||||
JS_FreeCString(context, blob_id);
|
||||
JS_FreeValue(context, blob_id_value);
|
||||
}
|
||||
JS_FreeValue(context, blob_ids);
|
||||
}*/
|
||||
|
||||
typedef struct _tf_ssb_connection_blobs_get_t
|
||||
{
|
||||
char blob_id[BLOB_ID_LEN];
|
||||
size_t n;
|
||||
size_t size;
|
||||
crypto_hash_sha256_state hash;
|
||||
uint8_t data[];
|
||||
} tf_ssb_blobs_get_t;
|
||||
|
||||
static void _tf_ssb_connection_on_rpc_blobs_get_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
tf_ssb_blobs_get_t* get = user_data;
|
||||
if (get) {
|
||||
if (get->n + size <= get->size) {
|
||||
memcpy(get->data + get->n, message, size);
|
||||
crypto_hash_sha256_update(&get->hash, message, size);
|
||||
get->n += size;
|
||||
}
|
||||
printf("received %zd / %zd for %s\n", get->n, get->size, get->blob_id);
|
||||
if (get->n == get->size) {
|
||||
uint8_t hash[crypto_hash_sha256_BYTES];
|
||||
crypto_hash_sha256_final(&get->hash, hash);
|
||||
|
||||
char hash64[256];
|
||||
base64c_encode(hash, sizeof(hash), (uint8_t*)hash64, sizeof(hash64));
|
||||
|
||||
char id[512];
|
||||
snprintf(id, sizeof(id), "&%s.sha256", hash64);
|
||||
|
||||
if (strcmp(id, get->blob_id) == 0) {
|
||||
if (tf_ssb_db_blob_store(ssb, get->data, get->size, id, sizeof(id))) {
|
||||
printf("stored blob %s\n", get->blob_id);
|
||||
} else {
|
||||
printf("failed to store %s\n", get->blob_id);
|
||||
}
|
||||
} else {
|
||||
printf("blob does not match id %s vs. %s\n", id, get->blob_id);
|
||||
}
|
||||
tf_ssb_connection_remove_request(connection, -request_number);
|
||||
free(get);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tf_ssb_rpc_send_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
|
||||
JSValue nameval = JS_NewArray(context);
|
||||
JS_SetPropertyUint32(context, nameval, 0, JS_NewString(context, "blobs"));
|
||||
JS_SetPropertyUint32(context, nameval, 1, JS_NewString(context, "get"));
|
||||
JS_SetPropertyStr(context, message, "name", nameval);
|
||||
JSValue argsval = JS_NewArray(context);
|
||||
JS_SetPropertyUint32(context, argsval, 0, JS_NewString(context, blob_id));
|
||||
JS_SetPropertyStr(context, message, "args", argsval);
|
||||
|
||||
JSValue str = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
|
||||
size_t len;
|
||||
const char* cstr = JS_ToCStringLen(context, &len, str);
|
||||
|
||||
tf_ssb_blobs_get_t* get = malloc(sizeof(tf_ssb_blobs_get_t) + size);
|
||||
*get = (tf_ssb_blobs_get_t) { .size = size };
|
||||
snprintf(get->blob_id, sizeof(get->blob_id), "%s", blob_id);
|
||||
crypto_hash_sha256_init(&get->hash);
|
||||
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), (const uint8_t*)cstr, len, _tf_ssb_connection_on_rpc_blobs_get_response, get);
|
||||
JS_FreeCString(context, cstr);
|
||||
JS_FreeValue(context, str);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
|
||||
static void _tf_ssb_connection_on_rpc_blobs_createWants_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
JSPropertyEnum* ptab;
|
||||
uint32_t plen;
|
||||
JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK);
|
||||
|
||||
for (uint32_t i = 0; i < plen; ++i) {
|
||||
JSPropertyDescriptor desc;
|
||||
JSValue key_value = JS_NULL;
|
||||
if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) {
|
||||
key_value = desc.value;
|
||||
}
|
||||
|
||||
int32_t size;
|
||||
if (JS_ToInt32(context, &size, key_value) == 0) {
|
||||
JSValue key = JS_AtomToString(context, ptab[i].atom);
|
||||
const char* blob_id = JS_ToCString(context, key);
|
||||
if (size >= 0 &&
|
||||
size < k_ssb_blob_bytes_max) {
|
||||
tf_ssb_rpc_send_blobs_get(connection, blob_id, size);
|
||||
} else if (size < 0) {
|
||||
size_t blob_size = 0;
|
||||
if (tf_ssb_db_blob_get(ssb, blob_id, NULL, &blob_size)) {
|
||||
JSValue size_response = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, size_response, blob_id, JS_NewInt64(context, blob_size));
|
||||
JSValue jsonval = JS_JSONStringify(context, size_response, JS_NULL, JS_NULL);
|
||||
size_t len;
|
||||
const char* json = JS_ToCStringLen(context, &len, jsonval);
|
||||
tf_ssb_rpc_t* rpc = tf_ssb_get_rpc(ssb);
|
||||
if (rpc->wants) {
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -rpc->wants->request_number, (uint8_t*)json, len, NULL, NULL);
|
||||
}
|
||||
JS_FreeCString(context, json);
|
||||
JS_FreeValue(context, jsonval);
|
||||
JS_FreeValue(context, size_response);
|
||||
}
|
||||
}
|
||||
JS_FreeCString(context, blob_id);
|
||||
JS_FreeValue(context, key);
|
||||
}
|
||||
|
||||
JS_FreeValue(context, key_value);
|
||||
}
|
||||
|
||||
for (uint32_t i = 0; i < plen; ++i) {
|
||||
JS_FreeAtom(context, ptab[i].atom);
|
||||
}
|
||||
js_free(context, ptab);
|
||||
}
|
||||
|
||||
void tf_ssb_rpc_send_blobs_createWants(tf_ssb_connection_t* connection)
|
||||
{
|
||||
const char* k_createWants = "{\"name\": [\"blobs\", \"createWants\"], \"type\": \"source\", \"args\": []}";
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), (const uint8_t*)k_createWants, strlen(k_createWants), _tf_ssb_connection_on_rpc_blobs_createWants_response, NULL);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_remove_wants(tf_ssb_rpc_t* rpc, tf_ssb_connection_t* connection)
|
||||
{
|
||||
for (tf_ssb_blob_wants_t** it = &rpc->wants; *it; it = &(*it)->next) {
|
||||
if ((*it)->connection == connection) {
|
||||
tf_ssb_blob_wants_t* wants = *it;
|
||||
*it = wants->next;
|
||||
free(wants);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_blob_wants_update(tf_ssb_blob_wants_t* wants)
|
||||
{
|
||||
static const int k_messages_per_query = 16;
|
||||
sqlite3* db = tf_ssb_connection_get_db(wants->connection);
|
||||
|
||||
sqlite3_stmt* statement;
|
||||
if (wants->rowid <= 0) {
|
||||
if (sqlite3_prepare(db, "SELECT MAX(messages.rowid) FROM messages", -1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_step(statement) == SQLITE_ROW) {
|
||||
wants->rowid = sqlite3_column_int64(statement, 0);
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
}
|
||||
|
||||
if (sqlite3_prepare(db,
|
||||
"SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json "
|
||||
"LEFT OUTER JOIN blobs ON json.value = blobs.id "
|
||||
"WHERE "
|
||||
" messages.rowid <= ?1 AND messages.rowid > ?2 AND "
|
||||
" json.value LIKE '&%%.sha256' AND "
|
||||
" length(json.value) = ?3 AND "
|
||||
" blobs.content IS NULL",
|
||||
-1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_bind_int64(statement, 1, wants->rowid) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 2, wants->rowid - k_messages_per_query) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 3, BLOB_ID_LEN - 1) == SQLITE_OK)
|
||||
{
|
||||
while (sqlite3_step(statement) == SQLITE_ROW) {
|
||||
JSContext* context = tf_ssb_connection_get_context(wants->connection);
|
||||
JSValue want = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, want, (const char*)sqlite3_column_text(statement, 0), JS_NewInt32(context, -1));
|
||||
JSValue jsonval = JS_JSONStringify(context, want, JS_NULL, JS_NULL);
|
||||
size_t len;
|
||||
const char* json = JS_ToCStringLen(context, &len, jsonval);
|
||||
++wants->open_count;
|
||||
tf_ssb_connection_rpc_send(wants->connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -wants->request_number, (const uint8_t*)json, len, NULL, NULL);
|
||||
JS_FreeCString(context, json);
|
||||
JS_FreeValue(context, jsonval);
|
||||
JS_FreeValue(context, want);
|
||||
}
|
||||
wants->rowid -= k_messages_per_query;
|
||||
} else {
|
||||
printf("bind failed: %s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
} else {
|
||||
printf("prepare failed: %s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
|
||||
if (wants->rowid <= 0) {
|
||||
tf_ssb_connection_rpc_send(wants->connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -wants->request_number, (const uint8_t*)"{}", 2, NULL, NULL);
|
||||
} else {
|
||||
uv_async_send(&wants->rpc->wants_async);
|
||||
}
|
||||
}
|
||||
|
||||
/*static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_rpc_t* rpc = user_data;
|
||||
tf_ssb_blob_wants_t* wants = malloc(sizeof(tf_ssb_blob_wants_t));
|
||||
*wants = (tf_ssb_blob_wants_t) {
|
||||
.rpc = rpc,
|
||||
.connection = connection,
|
||||
.request_number = request_number,
|
||||
.rowid = -1,
|
||||
.next = rpc->wants,
|
||||
};
|
||||
rpc->wants = wants;
|
||||
_tf_ssb_blob_wants_update(wants);
|
||||
}*/
|
||||
|
||||
/*static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
sqlite3* db = tf_ssb_connection_get_db(connection);
|
||||
JSValue streamArgs = JS_GetPropertyStr(context, args, "args");
|
||||
JSValue obj = JS_GetPropertyUint32(context, streamArgs, 0);
|
||||
JSValue idval = JS_GetPropertyStr(context, obj, "id");
|
||||
const char* author = JS_ToCString(context, idval);
|
||||
int64_t seq = 0;
|
||||
sqlite3_stmt* statement;
|
||||
JSValue seqval = JS_GetPropertyStr(context, obj, "seq");
|
||||
JS_ToInt64(context, &seq, seqval);
|
||||
JS_FreeValue(context, seqval);
|
||||
|
||||
const char* query = "SELECT previous, sequence, timestamp, hash, content, signature FROM messages WHERE author = $1 AND sequence >= $2 ORDER BY sequence";
|
||||
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 2, seq) == SQLITE_OK) {
|
||||
while (sqlite3_step(statement) == SQLITE_ROW) {
|
||||
JSValue message = JS_NewObject(context);
|
||||
const char* previous = (const char*)sqlite3_column_text(statement, 0);
|
||||
JS_SetPropertyStr(context, message, "previous", previous ? JS_NewString(context, previous) : JS_NULL);
|
||||
JS_SetPropertyStr(context, message, "author", JS_NewString(context, author));
|
||||
JS_SetPropertyStr(context, message, "sequence", JS_NewInt64(context, sqlite3_column_int64(statement, 1)));
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewInt64(context, sqlite3_column_int64(statement, 2)));
|
||||
JS_SetPropertyStr(context, message, "hash", JS_NewString(context, (const char*)sqlite3_column_text(statement, 3)));
|
||||
const char* contentstr = (const char*)sqlite3_column_text(statement, 4);
|
||||
JSValue content = JS_ParseJSON(context, contentstr, strlen(contentstr), NULL);
|
||||
JS_SetPropertyStr(context, message, "content", content);
|
||||
JS_SetPropertyStr(context, message, "signature", JS_NewString(context, (const char*)sqlite3_column_text(statement, 5)));
|
||||
JSValue jsonval = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
|
||||
size_t len;
|
||||
const char* json = JS_ToCStringLen(context, &len, jsonval);
|
||||
if (tf_ssb_verify_and_strip_signature(context, message, NULL, 0)) {
|
||||
tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)json, len, NULL, NULL);
|
||||
} else {
|
||||
printf("message signature is invalid\n");
|
||||
}
|
||||
JS_FreeCString(context, json);
|
||||
JS_FreeValue(context, jsonval);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
} else {
|
||||
printf("prepare failed: %s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
tf_ssb_connection_rpc_send(connection, flags | k_ssb_rpc_flag_end_error, -request_number, (const uint8_t*)"true", 4, NULL, NULL);
|
||||
JS_FreeValue(context, obj);
|
||||
JS_FreeCString(context, author);
|
||||
JS_FreeValue(context, idval);
|
||||
JS_FreeValue(context, streamArgs);
|
||||
}*/
|
||||
|
||||
static void _tf_ssb_connection_on_rpc_createHistoryStream_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue val, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_rpc_t* rpc = user_data;
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
char signature[crypto_sign_BYTES + 128];
|
||||
char id[crypto_hash_sha256_BYTES * 2 + 1];
|
||||
tf_ssb_calculate_message_id(context, val, id, sizeof(id));
|
||||
if (tf_ssb_verify_and_strip_signature(context, val, signature, sizeof(signature))) {
|
||||
tf_ssb_db_store_message(tf_ssb_connection_get_ssb(connection), context, id, val, signature);
|
||||
} else {
|
||||
printf("failed to verify message\n");
|
||||
}
|
||||
uv_async_send(&rpc->wants_async);
|
||||
}
|
||||
|
||||
void tf_ssb_rpc_send_createHistoryStream(tf_ssb_connection_t* connection, const char* id)
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
|
||||
JSValue nameval = JS_NewArray(context);
|
||||
JS_SetPropertyUint32(context, nameval, 0, JS_NewString(context, "createHistoryStream"));
|
||||
JS_SetPropertyStr(context, message, "name", nameval);
|
||||
JSValue obj = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, obj, "id", JS_NewString(context, id));
|
||||
JS_SetPropertyStr(context, obj, "keys", JS_FALSE);
|
||||
int64_t sequence = 0;
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
if (tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0)) {
|
||||
JS_SetPropertyStr(context, obj, "seq", JS_NewInt64(context, sequence));
|
||||
}
|
||||
JSValue argsval = JS_NewArray(context);
|
||||
JS_SetPropertyUint32(context, argsval, 0, obj);
|
||||
JS_SetPropertyStr(context, message, "args", argsval);
|
||||
|
||||
JSValue str = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
|
||||
size_t len;
|
||||
const char* cstr = JS_ToCStringLen(context, &len, str);
|
||||
tf_ssb_rpc_t* rpc = tf_ssb_get_rpc(ssb);
|
||||
tf_ssb_connection_rpc_send(connection, 0xa, tf_ssb_connection_next_request_number(connection), (const uint8_t*)cstr, len, _tf_ssb_connection_on_rpc_createHistoryStream_response, rpc);
|
||||
JS_FreeCString(context, cstr);
|
||||
JS_FreeValue(context, str);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_on_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
if (change != k_tf_ssb_change_connect) {
|
||||
return;
|
||||
}
|
||||
|
||||
char connection_id[k_id_base64_len];
|
||||
char id[k_id_base64_len];
|
||||
if (tf_ssb_connection_get_id(connection, connection_id, sizeof(connection_id)) &&
|
||||
tf_ssb_whoami(ssb, id, sizeof(id))) {
|
||||
const char** ids = tf_ssb_get_following_deep(ssb, (const char*[]) { id, NULL }, 2);
|
||||
bool found_connection_id = false;
|
||||
for (int i = 0; ids && ids[i]; i++) {
|
||||
tf_ssb_rpc_send_createHistoryStream(connection, ids[i]);
|
||||
if (strcmp(ids[i], connection_id) == 0) {
|
||||
found_connection_id = true;
|
||||
}
|
||||
}
|
||||
if (!found_connection_id) {
|
||||
tf_ssb_rpc_send_createHistoryStream(connection, connection_id);
|
||||
}
|
||||
free(ids);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_add_id(const char*** results, int* results_count, int* results_capacity, const char* id)
|
||||
{
|
||||
for (int i = 0; i < *results_count; i++) {
|
||||
if (strcmp((*results)[i], id) == 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (*results_count + 1 > *results_capacity) {
|
||||
int old_capacity = *results_capacity;
|
||||
*results_capacity = (*results_capacity + 1) * 2;
|
||||
*results = realloc(*results, sizeof(const char*) * (*results_capacity));
|
||||
memset(*results + *results_count, 0, sizeof(const char*) * (*results_capacity - old_capacity));
|
||||
}
|
||||
(*results)[(*results_count)++] = strdup(id);
|
||||
}
|
||||
|
||||
const char** tf_ssb_get_following(tf_ssb_t* ssb, const char* id)
|
||||
{
|
||||
sqlite3* db = tf_ssb_get_db(ssb);
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
|
||||
const int k_version = 0;
|
||||
int64_t rowid = 0;
|
||||
|
||||
const char** results = NULL;
|
||||
int results_count = 0;
|
||||
int results_capacity = 0;
|
||||
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, "SELECT value FROM properties WHERE id = ? AND key = (? || ':following')", -1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_bind_text(statement, 1, "core", -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 2, id, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_step(statement) == SQLITE_ROW) {
|
||||
JSValue cache = JS_ParseJSON(context, (const char*)sqlite3_column_text(statement, 0), sqlite3_column_bytes(statement, 0), NULL);
|
||||
JSValue version_value = JS_GetPropertyStr(context, cache, "version");
|
||||
int32_t version = 0;
|
||||
JS_ToInt32(context, &version, version_value);
|
||||
JS_FreeValue(context, version_value);
|
||||
|
||||
if (version == k_version) {
|
||||
JSValue rowid_value = JS_GetPropertyStr(context, cache, "rowid");
|
||||
JS_ToInt64(context, &rowid, rowid_value);
|
||||
JS_FreeValue(context, rowid_value);
|
||||
|
||||
JSValue ids = JS_GetPropertyStr(context, cache, "following");
|
||||
JSValue length_value = JS_GetPropertyStr(context, ids, "length");
|
||||
int32_t length = 0;
|
||||
JS_ToInt32(context, &length, length_value);
|
||||
for (int i = 0; i < length; i++) {
|
||||
JSValue id = JS_GetPropertyUint32(context, ids, i);
|
||||
const char* id_string = JS_ToCString(context, id);
|
||||
_tf_ssb_add_id(&results, &results_count, &results_capacity, id_string);
|
||||
JS_FreeCString(context, id_string);
|
||||
JS_FreeValue(context, id);
|
||||
}
|
||||
JS_FreeValue(context, length_value);
|
||||
JS_FreeValue(context, ids);
|
||||
}
|
||||
JS_FreeValue(context, cache);
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
int64_t loaded_rowid = rowid;
|
||||
|
||||
JSValue cache = JS_UNDEFINED;
|
||||
JS_FreeValue(context, cache);
|
||||
|
||||
if (sqlite3_prepare(db, "SELECT "
|
||||
"json_extract(content, '$.contact'), "
|
||||
"json_extract(content, '$.following'), "
|
||||
"rowid "
|
||||
"FROM messages "
|
||||
"WHERE author = ? AND "
|
||||
"rowid > ? AND "
|
||||
"json_extract(content, '$.type') = 'contact' UNION "
|
||||
"SELECT NULL, NULL, MAX(rowid) FROM messages "
|
||||
"ORDER BY rowid", -1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 2, rowid) == SQLITE_OK) {
|
||||
while (sqlite3_step(statement) == SQLITE_ROW) {
|
||||
const char* contact = (const char*)sqlite3_column_text(statement, 0);
|
||||
if (sqlite3_column_type(statement, 0) != SQLITE_NULL) {
|
||||
if (sqlite3_column_int(statement, 1)) {
|
||||
_tf_ssb_add_id(&results, &results_count, &results_capacity, contact);
|
||||
} else {
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
if (strcmp(results[i], contact) == 0) {
|
||||
free((void*)results[i]);
|
||||
results[i] = results[--results_count];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
rowid = sqlite3_column_int64(statement, 2);
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
|
||||
if (rowid != loaded_rowid) {
|
||||
JSValue cache = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, cache, "version", JS_NewInt32(context, k_version));
|
||||
JS_SetPropertyStr(context, cache, "rowid", JS_NewInt64(context, rowid));
|
||||
JSValue ids = JS_NewArray(context);
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
JS_SetPropertyUint32(context, ids, i, JS_NewString(context, results[i]));
|
||||
}
|
||||
JS_SetPropertyStr(context, cache, "following", ids);
|
||||
JSValue json_value = JS_JSONStringify(context, cache, JS_NULL, JS_NULL);
|
||||
const char* json = JS_ToCString(context, json_value);
|
||||
JS_FreeValue(context, json_value);
|
||||
JS_FreeValue(context, cache);
|
||||
if (sqlite3_prepare(db, "INSERT OR REPLACE INTO properties (id, key, value) VALUES (?, ? || ':following', ?)", -1, &statement, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_bind_text(statement, 1, "core", -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 2, id, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 3, json, -1, NULL) == SQLITE_OK) {
|
||||
sqlite3_step(statement);
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
JS_FreeCString(context, json);
|
||||
}
|
||||
|
||||
size_t size = (results_count + 1) * sizeof(const char*);
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
size += strlen(results[i]) + 1;
|
||||
}
|
||||
char** result = malloc(size);
|
||||
char* p = (char*)result + (results_count + 1) * sizeof(const char*);
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
result[i] = p;
|
||||
size_t length = strlen(results[i]) + 1;
|
||||
memcpy(p, results[i], length);
|
||||
free((void*)results[i]);
|
||||
p += length;
|
||||
}
|
||||
result[results_count] = NULL;
|
||||
free((void*)results);
|
||||
|
||||
return (const char**)result;
|
||||
}
|
||||
|
||||
const char** tf_ssb_get_following_deep(tf_ssb_t* ssb, const char** ids, int depth)
|
||||
{
|
||||
const char** results = NULL;
|
||||
int results_count = 0;
|
||||
int results_capacity = 0;
|
||||
|
||||
for (int i = 0; ids[i]; i++) {
|
||||
_tf_ssb_add_id(&results, &results_count, &results_capacity, ids[i]);
|
||||
}
|
||||
|
||||
for (int i = 0; ids[i]; i++) {
|
||||
const char** following = tf_ssb_get_following(ssb, ids[i]);
|
||||
for (int j = 0; following[j]; j++) {
|
||||
_tf_ssb_add_id(&results, &results_count, &results_capacity, following[j]);
|
||||
}
|
||||
free(following);
|
||||
}
|
||||
|
||||
size_t size = (results_count + 1) * sizeof(const char*);
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
size += strlen(results[i]) + 1;
|
||||
}
|
||||
char** result = malloc(size);
|
||||
char* p = (char*)result + (results_count + 1) * sizeof(const char*);
|
||||
for (int i = 0; i < results_count; i++) {
|
||||
result[i] = p;
|
||||
size_t length = strlen(results[i]) + 1;
|
||||
memcpy(p, results[i], length);
|
||||
free((void*)results[i]);
|
||||
p += length;
|
||||
}
|
||||
result[results_count] = NULL;
|
||||
free((void*)results);
|
||||
|
||||
if (depth > 1) {
|
||||
const char** r = tf_ssb_get_following_deep(ssb, (const char**)result, depth - 1);
|
||||
free(result);
|
||||
result = (char**)r;
|
||||
}
|
||||
return (const char**)result;
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
tf_ssb_rpc_t* rpc = user_data;
|
||||
if (change == k_tf_ssb_change_connect) {
|
||||
tf_ssb_rpc_send_blobs_createWants(connection);
|
||||
} else if (change == k_tf_ssb_change_remove) {
|
||||
_tf_ssb_rpc_remove_wants(rpc, connection);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_wants_async(uv_async_t* async)
|
||||
{
|
||||
tf_ssb_rpc_t* rpc = async->data;
|
||||
tf_ssb_blob_wants_t* it = rpc->wants;
|
||||
if (it)
|
||||
{
|
||||
rpc->wants = it->next;
|
||||
it->next = NULL;
|
||||
if (rpc->wants) {
|
||||
for (tf_ssb_blob_wants_t* tail = rpc->wants; tail; tail = tail->next) {
|
||||
if (!tail->next) {
|
||||
tail->next = it;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
rpc->wants = it;
|
||||
}
|
||||
_tf_ssb_blob_wants_update(it);
|
||||
}
|
||||
}
|
||||
|
||||
tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb)
|
||||
{
|
||||
tf_ssb_rpc_t* rpc = malloc(sizeof(tf_ssb_rpc_t));
|
||||
*rpc = (tf_ssb_rpc_t) {
|
||||
.wants_async.data = rpc,
|
||||
};
|
||||
//tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc);
|
||||
//tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL);
|
||||
(void)_tf_ssb_rpc_connections_changed_callback;
|
||||
(void)_tf_ssb_rpc_on_connections_changed;
|
||||
//tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL);
|
||||
//tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL);
|
||||
//tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc);
|
||||
//tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL);
|
||||
//uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async);
|
||||
(void)_tf_ssb_rpc_wants_async;
|
||||
//uv_unref((uv_handle_t*)&rpc->wants_async);
|
||||
return rpc;
|
||||
}
|
||||
|
||||
/*static void _tf_ssb_rpc_handle_closed(uv_handle_t* handle)
|
||||
{
|
||||
free(handle->data);
|
||||
}*/
|
||||
|
||||
void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc)
|
||||
{
|
||||
//uv_close((uv_handle_t*)&rpc->wants_async, _tf_ssb_rpc_handle_closed);
|
||||
free(rpc);
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
typedef struct _tf_ssb_t tf_ssb_t;
|
||||
typedef struct _tf_ssb_connection_t tf_ssb_connection_t;
|
||||
typedef struct _tf_ssb_rpc_t tf_ssb_rpc_t;
|
||||
|
||||
tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb);
|
||||
void tf_ssb_rpc_destroy(tf_ssb_rpc_t* rpc);
|
||||
|
||||
/*void tf_ssb_rpc_send_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size);
|
||||
void tf_ssb_rpc_send_blobs_createWants(tf_ssb_connection_t* connection);
|
||||
void tf_ssb_rpc_send_createHistoryStream(tf_ssb_connection_t* connection, const char* id);*/
|
@ -197,6 +197,10 @@ void tf_ssb_test_following(const tf_test_options_t* options)
|
||||
JS_FreeValue(context, message); \
|
||||
context = NULL
|
||||
|
||||
#if 1
|
||||
/* TODO: This test doesn't actually really test anything anymore. */
|
||||
#define DUMP(id, depth)
|
||||
#else
|
||||
#define DUMP(id, depth) \
|
||||
do { \
|
||||
printf("following %d:\n", depth); \
|
||||
@ -208,6 +212,7 @@ void tf_ssb_test_following(const tf_test_options_t* options)
|
||||
printf("\n"); \
|
||||
free(f); \
|
||||
} while (0)
|
||||
#endif
|
||||
|
||||
FOLLOW(ssb0, id1, true);
|
||||
FOLLOW(ssb1, id2, true);
|
||||
|
Loading…
Reference in New Issue
Block a user