Trying to fix blob transfers.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3625 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2021-01-02 22:48:33 +00:00
parent dd489de252
commit 01ff073af0
4 changed files with 13 additions and 4 deletions

View File

@ -341,7 +341,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect
return found; return found;
} }
static void _tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data)
{ {
if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) {
return; return;
@ -371,7 +371,7 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data) void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data)
{ {
if (request_number > 0) { if (request_number > 0) {
_tf_ssb_connection_add_request(connection, request_number, callback, user_data); tf_ssb_connection_add_request(connection, request_number, callback, user_data);
} }
uint8_t* combined = malloc(9 + size); uint8_t* combined = malloc(9 + size);
*combined = flags; *combined = flags;
@ -1881,6 +1881,7 @@ bool tf_ssb_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* ou
char id[512]; char id[512];
snprintf(id, sizeof(id), "&%s.sha256", hash64); snprintf(id, sizeof(id), "&%s.sha256", hash64);
printf("blob store %s\n", id);
const char* query = "INSERT INTO blobs (id, content, created) VALUES ($1, $2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING"; const char* query = "INSERT INTO blobs (id, content, created) VALUES ($1, $2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING";
if (sqlite3_prepare(ssb->db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_prepare(ssb->db, query, -1, &statement, NULL) == SQLITE_OK) {

View File

@ -111,4 +111,5 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection);
bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number);

View File

@ -193,7 +193,10 @@ static void _tf_ssb_connection_on_rpc_blobs_createWants_response(tf_ssb_connecti
JSValue jsonval = JS_JSONStringify(context, size_response, JS_NULL, JS_NULL); JSValue jsonval = JS_JSONStringify(context, size_response, JS_NULL, JS_NULL);
size_t len; size_t len;
const char* json = JS_ToCStringLen(context, &len, jsonval); const char* json = JS_ToCStringLen(context, &len, jsonval);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, request_number, (uint8_t*)json, len, NULL, NULL); tf_ssb_rpc_t* rpc = tf_ssb_get_rpc(ssb);
if (rpc->wants) {
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -rpc->wants->request_number, (uint8_t*)json, len, NULL, NULL);
}
JS_FreeCString(context, json); JS_FreeCString(context, json);
JS_FreeValue(context, jsonval); JS_FreeValue(context, jsonval);
JS_FreeValue(context, size_response); JS_FreeValue(context, size_response);
@ -640,12 +643,12 @@ tf_ssb_rpc_t* tf_ssb_rpc_create(tf_ssb_t* ssb)
*rpc = (tf_ssb_rpc_t) { *rpc = (tf_ssb_rpc_t) {
.wants_async.data = rpc, .wants_async.data = rpc,
}; };
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL); tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_on_connections_changed, NULL);
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blob_has, NULL);
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blob_get, NULL);
tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc); tf_ssb_register_rpc(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, rpc);
tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL); tf_ssb_register_rpc(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL);
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, rpc);
uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async); uv_async_init(tf_ssb_get_loop(ssb), &rpc->wants_async, _tf_ssb_rpc_wants_async);
uv_unref((uv_handle_t*)&rpc->wants_async); uv_unref((uv_handle_t*)&rpc->wants_async);
return rpc; return rpc;

View File

@ -116,6 +116,8 @@ static void _tf_ssb_test_ssb()
tf_ssb_append_message(ssb0, message); tf_ssb_append_message(ssb0, message);
JS_FreeValue(context, message); JS_FreeValue(context, message);
assert(tf_ssb_blob_get(ssb0, blob_id, NULL, NULL));
assert(!tf_ssb_blob_get(ssb1, blob_id, NULL, NULL));
tf_ssb_server_open(ssb0, 12347); tf_ssb_server_open(ssb0, 12347);
uint8_t id0bin[k_id_bin_len]; uint8_t id0bin[k_id_bin_len];
@ -132,9 +134,11 @@ static void _tf_ssb_test_ssb()
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
} }
printf("waiting for blob\n");
while (!tf_ssb_blob_get(ssb1, blob_id, NULL, NULL)) { while (!tf_ssb_blob_get(ssb1, blob_id, NULL, NULL)) {
uv_run(&loop, UV_RUN_ONCE); uv_run(&loop, UV_RUN_ONCE);
} }
printf("done\n");
tf_ssb_send_close(ssb1); tf_ssb_send_close(ssb1);