createHistoryStream JS -> C.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4110 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
parent
6ff33191bb
commit
53e4f4341c
42
core/ssb.js
42
core/ssb.js
@ -1,5 +1,4 @@
|
||||
"use strict";
|
||||
var g_wants_requests = {};
|
||||
var g_database = new Database('core');
|
||||
const k_use_create_history_stream = false;
|
||||
|
||||
@ -29,7 +28,7 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn
|
||||
if (k_use_create_history_stream) {
|
||||
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
|
||||
var identities = ssb.getAllIdentities();
|
||||
ssb.followingDeep(identities, 2).then(function(ids) {
|
||||
let ids = ssb.followingDeep(identities, 2);
|
||||
for (let id of ids) {
|
||||
if (identities.indexOf(id) != -1) {
|
||||
continue;
|
||||
@ -37,17 +36,11 @@ ssb.addEventListener('connections', function on_connections_changed(change, conn
|
||||
var sequence = get_latest_sequence_for_author(id);
|
||||
connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if (connection.is_client) {
|
||||
connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
|
||||
}
|
||||
}
|
||||
} else if (change == 'remove') {
|
||||
print('REMOVE', connection.id);
|
||||
delete g_wants_requests[connection.id];
|
||||
} else {
|
||||
print('CHANGE', change);
|
||||
}
|
||||
});
|
||||
|
||||
@ -169,36 +162,3 @@ function ebtReplicateServer(request) {
|
||||
}
|
||||
|
||||
ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer);
|
||||
|
||||
ssb.addRpc(['createHistoryStream'], function(request) {
|
||||
if (!request?.args) {
|
||||
print('missing request.args in createHistoryStream');
|
||||
}
|
||||
var id = request.args[0].id;
|
||||
var seq = request.args[0].seq;
|
||||
var keys = request.args[0].keys || request.args[0].keys === undefined;
|
||||
function sendMessage(row) {
|
||||
if (keys) {
|
||||
var message = {
|
||||
key: row.id,
|
||||
value: formatMessage(row),
|
||||
timestamp: row.timestamp,
|
||||
};
|
||||
} else {
|
||||
var message = formatMessage(row);
|
||||
}
|
||||
request.send_json(message);
|
||||
}
|
||||
ssb.sqlStream(
|
||||
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
|
||||
[id, seq ?? 0],
|
||||
sendMessage);
|
||||
ssb.addEventListener('message', function(message_id) {
|
||||
ssb.sqlStream(
|
||||
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2',
|
||||
[message_id, id],
|
||||
function (row) {
|
||||
sendMessage(row);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
82
src/ssb.c
82
src/ssb.c
@ -188,6 +188,13 @@ typedef struct _tf_ssb_t
|
||||
int broadcasts_changed_count;
|
||||
} tf_ssb_t;
|
||||
|
||||
typedef struct _tf_ssb_connection_message_request_t
|
||||
{
|
||||
char author[k_id_base64_len];
|
||||
int32_t request_number;
|
||||
bool keys;
|
||||
} tf_ssb_connection_message_request_t;
|
||||
|
||||
typedef struct _tf_ssb_connection_t
|
||||
{
|
||||
tf_ssb_t* ssb;
|
||||
@ -241,6 +248,9 @@ typedef struct _tf_ssb_connection_t
|
||||
tf_ssb_request_t* requests;
|
||||
int requests_count;
|
||||
const char* destroy_reason;
|
||||
|
||||
tf_ssb_connection_message_request_t* message_requests;
|
||||
int message_requests_count;
|
||||
} tf_ssb_connection_t;
|
||||
|
||||
static JSClassID _connection_class_id;
|
||||
@ -499,6 +509,34 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ
|
||||
connection->ssb->request_count++;
|
||||
}
|
||||
|
||||
static int _message_request_compare(const void* a, const void* b)
|
||||
{
|
||||
const char* author = a;
|
||||
const tf_ssb_connection_message_request_t* rb = b;
|
||||
return strcmp(author, rb->author);
|
||||
}
|
||||
|
||||
void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys)
|
||||
{
|
||||
int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare);
|
||||
if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
connection->message_requests = tf_resize_vec(connection->message_requests, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count + 1));
|
||||
if (connection->message_requests_count - index)
|
||||
{
|
||||
memmove(connection->message_requests + index + 1, connection->message_requests + index, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index));
|
||||
}
|
||||
connection->message_requests[index] = (tf_ssb_connection_message_request_t)
|
||||
{
|
||||
.request_number = request_number,
|
||||
.keys = keys,
|
||||
};
|
||||
snprintf(connection->message_requests[index].author, sizeof(connection->message_requests[index].author), "%s", author);
|
||||
connection->message_requests_count++;
|
||||
}
|
||||
|
||||
static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number)
|
||||
{
|
||||
tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
|
||||
@ -518,7 +556,6 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i
|
||||
|
||||
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
|
||||
{
|
||||
printf("SEND %p\n", connection);
|
||||
if (!connection)
|
||||
{
|
||||
return;
|
||||
@ -1475,12 +1512,10 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message)
|
||||
|
||||
void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason)
|
||||
{
|
||||
printf("DESTROY %p\n", connection);
|
||||
tf_ssb_t* ssb = connection->ssb;
|
||||
if (!connection->destroy_reason)
|
||||
{
|
||||
connection->destroy_reason = reason;
|
||||
printf("destroying connection %p obj=%p: %s\n", connection, JS_VALUE_GET_PTR(connection->object), reason);
|
||||
}
|
||||
while (connection->requests)
|
||||
{
|
||||
@ -1558,6 +1593,10 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea
|
||||
!connection->tcp.data &&
|
||||
!connection->connect.data)
|
||||
{
|
||||
tf_free(connection->message_requests);
|
||||
connection->message_requests = NULL;
|
||||
connection->message_requests_count = 0;
|
||||
|
||||
tf_free(connection);
|
||||
}
|
||||
}
|
||||
@ -2907,6 +2946,43 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id)
|
||||
next = node->next;
|
||||
node->callback(ssb, id, node->user_data);
|
||||
}
|
||||
|
||||
JSContext* context = ssb->context;
|
||||
JSValue message_keys = tf_ssb_db_get_message_by_id(ssb, id, true);
|
||||
JSValue message = JS_GetPropertyStr(context, message_keys, "value");
|
||||
if (!JS_IsUndefined(message))
|
||||
{
|
||||
JSValue author = JS_GetPropertyStr(context, message, "author");
|
||||
const char* author_string = JS_ToCString(context, author);
|
||||
|
||||
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
|
||||
{
|
||||
tf_ssb_connection_message_request_t* message_request =
|
||||
bsearch(
|
||||
author_string,
|
||||
connection->message_requests,
|
||||
connection->message_requests_count,
|
||||
sizeof(tf_ssb_connection_message_request_t),
|
||||
_message_request_compare);
|
||||
if (message_request)
|
||||
{
|
||||
tf_ssb_connection_rpc_send_json(
|
||||
connection,
|
||||
k_ssb_rpc_flag_stream,
|
||||
message_request->request_number,
|
||||
message_request->keys ? message_keys : message,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
JS_FreeCString(context, author_string);
|
||||
JS_FreeValue(context, author);
|
||||
}
|
||||
JS_FreeValue(context, message);
|
||||
JS_FreeValue(context, message_keys);
|
||||
}
|
||||
|
||||
void tf_ssb_add_blob_want_added_callback(tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
|
||||
|
46
src/ssb.db.c
46
src/ssb.db.c
@ -711,7 +711,7 @@ JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue bi
|
||||
return result;
|
||||
}
|
||||
|
||||
static JSValue _tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author)
|
||||
JSValue tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author)
|
||||
{
|
||||
JSValue value = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, value, "previous", previous ? JS_NewString(context, previous) : JS_NULL);
|
||||
@ -779,7 +779,7 @@ bool tf_ssb_db_check(sqlite3* db, const char* check_author)
|
||||
const char* content = (const char*)sqlite3_column_text(statement, 6);
|
||||
const char* signature = (const char*)sqlite3_column_text(statement, 7);
|
||||
bool sequence_before_author = sqlite3_column_int(statement, 8);
|
||||
JSValue message = _tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author);
|
||||
JSValue message = tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author);
|
||||
char out_signature[512];
|
||||
char actual_id[k_id_base64_len];
|
||||
bool actual_sequence_before_author = false;
|
||||
@ -1182,3 +1182,45 @@ void tf_ssb_db_private(sqlite3* db)
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
}
|
||||
|
||||
JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys)
|
||||
{
|
||||
JSValue result = JS_UNDEFINED;
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
sqlite3* db = tf_ssb_get_db(ssb);
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_step(statement) == SQLITE_ROW)
|
||||
{
|
||||
JSValue message = JS_UNDEFINED;
|
||||
JSValue formatted = tf_ssb_format_message(
|
||||
context,
|
||||
(const char*)sqlite3_column_text(statement, 0),
|
||||
(const char*)sqlite3_column_text(statement, 1),
|
||||
sqlite3_column_int64(statement, 3),
|
||||
sqlite3_column_double(statement, 4),
|
||||
(const char*)sqlite3_column_text(statement, 5),
|
||||
(const char*)sqlite3_column_text(statement, 6),
|
||||
(const char*)sqlite3_column_text(statement, 7),
|
||||
sqlite3_column_int(statement, 8));
|
||||
if (is_keys)
|
||||
{
|
||||
message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
|
||||
JS_SetPropertyStr(context, message, "value", formatted);
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4)));
|
||||
}
|
||||
else
|
||||
{
|
||||
message = formatted;
|
||||
}
|
||||
result = message;
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
11
src/ssb.db.h
11
src/ssb.db.h
@ -12,6 +12,7 @@ bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id);
|
||||
bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size);
|
||||
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new);
|
||||
|
||||
JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys);
|
||||
bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* author, int64_t sequence, char* out_message_id, size_t out_message_id_size, double* out_timestamp, char** out_content);
|
||||
bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, int64_t* out_sequence, char* out_message_id, size_t out_message_id_size);
|
||||
JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data);
|
||||
@ -26,6 +27,16 @@ void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)(
|
||||
void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data);
|
||||
bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size);
|
||||
|
||||
JSValue tf_ssb_format_message(
|
||||
JSContext* context,
|
||||
const char* previous,
|
||||
const char* author,
|
||||
int64_t sequence,
|
||||
double timestamp,
|
||||
const char* hash,
|
||||
const char* content,
|
||||
const char* signature,
|
||||
bool sequence_before_author);
|
||||
const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth);
|
||||
|
||||
void tf_ssb_db_private(sqlite3* db);
|
||||
|
@ -151,6 +151,8 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f
|
||||
void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number);
|
||||
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection);
|
||||
|
||||
void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys);
|
||||
|
||||
bool tf_ssb_connection_is_attendant(tf_ssb_connection_t* connection);
|
||||
int32_t tf_ssb_connection_get_attendant_request_number(tf_ssb_connection_t* connection);
|
||||
void tf_ssb_connection_set_attendant(tf_ssb_connection_t* connection, bool attendant, int request_number);
|
||||
|
@ -678,6 +678,78 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
JSValue arg_array = JS_GetPropertyStr(context, args, "args");
|
||||
JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
|
||||
if (JS_IsUndefined(arg))
|
||||
{
|
||||
tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream.");
|
||||
}
|
||||
JSValue id = JS_GetPropertyStr(context, arg, "id");
|
||||
JSValue seq = JS_GetPropertyStr(context, arg, "seq");
|
||||
JSValue keys = JS_GetPropertyStr(context, arg, "keys");
|
||||
JSValue live = JS_GetPropertyStr(context, arg, "live");
|
||||
bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
|
||||
bool is_live = JS_ToBool(context, live) > 0;
|
||||
int64_t sequence = 0;
|
||||
JS_ToInt64(context, &sequence, seq);
|
||||
const char* author = JS_ToCString(context, id);
|
||||
|
||||
sqlite3* db = tf_ssb_get_db(ssb);
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK)
|
||||
{
|
||||
while (sqlite3_step(statement) == SQLITE_ROW)
|
||||
{
|
||||
JSValue message = JS_UNDEFINED;
|
||||
|
||||
JSValue formatted = tf_ssb_format_message(
|
||||
context,
|
||||
(const char*)sqlite3_column_text(statement, 0),
|
||||
(const char*)sqlite3_column_text(statement, 1),
|
||||
sqlite3_column_int64(statement, 3),
|
||||
sqlite3_column_double(statement, 4),
|
||||
(const char*)sqlite3_column_text(statement, 5),
|
||||
(const char*)sqlite3_column_text(statement, 6),
|
||||
(const char*)sqlite3_column_text(statement, 7),
|
||||
sqlite3_column_int(statement, 8));
|
||||
if (is_keys)
|
||||
{
|
||||
message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
|
||||
JS_SetPropertyStr(context, message, "value", formatted);
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4)));
|
||||
}
|
||||
else
|
||||
{
|
||||
message = formatted;
|
||||
}
|
||||
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
|
||||
if (is_live)
|
||||
{
|
||||
tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys);
|
||||
}
|
||||
|
||||
JS_FreeCString(context, author);
|
||||
JS_FreeValue(context, id);
|
||||
JS_FreeValue(context, seq);
|
||||
JS_FreeValue(context, keys);
|
||||
JS_FreeValue(context, arg);
|
||||
JS_FreeValue(context, arg_array);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
@ -760,4 +832,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb)
|
||||
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL);
|
||||
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_tunnel_is_room, NULL, NULL);
|
||||
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL);
|
||||
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user