Store blobs from the worker threads. Let's see if this is a good idea.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4326 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
		@@ -3236,6 +3236,10 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
 | 
							for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
 | 
								if (!connection->message_requests_count)
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									continue;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			tf_ssb_connection_message_request_t* message_request =
 | 
								tf_ssb_connection_message_request_t* message_request =
 | 
				
			||||||
				bsearch(
 | 
									bsearch(
 | 
				
			||||||
					author_string,
 | 
										author_string,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,11 +1,13 @@
 | 
				
			|||||||
#include "ssb.rpc.h"
 | 
					#include "ssb.rpc.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "log.h"
 | 
				
			||||||
#include "mem.h"
 | 
					#include "mem.h"
 | 
				
			||||||
#include "ssb.h"
 | 
					#include "ssb.h"
 | 
				
			||||||
#include "ssb.db.h"
 | 
					#include "ssb.db.h"
 | 
				
			||||||
#include "util.js.h"
 | 
					#include "util.js.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include "sqlite3.h"
 | 
					#include "sqlite3.h"
 | 
				
			||||||
 | 
					#include "uv.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <inttypes.h>
 | 
					#include <inttypes.h>
 | 
				
			||||||
#include <string.h>
 | 
					#include <string.h>
 | 
				
			||||||
@@ -420,9 +422,31 @@ typedef struct _blobs_get_t
 | 
				
			|||||||
	char id[k_blob_id_len];
 | 
						char id[k_blob_id_len];
 | 
				
			||||||
	size_t received;
 | 
						size_t received;
 | 
				
			||||||
	size_t expected_size;
 | 
						size_t expected_size;
 | 
				
			||||||
	char buffer[];
 | 
						bool done;
 | 
				
			||||||
 | 
						tf_ssb_t* ssb;
 | 
				
			||||||
 | 
						uv_work_t work;
 | 
				
			||||||
 | 
						uint8_t buffer[];
 | 
				
			||||||
} blobs_get_t;
 | 
					} blobs_get_t;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void _tf_ssb_rpc_blob_store_work(uv_work_t* work)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						blobs_get_t* get = work->data;
 | 
				
			||||||
 | 
						tf_ssb_db_blob_store(get->ssb, get->buffer, get->received, NULL, 0, NULL);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void _tf_ssb_rpc_blob_store_after_work(uv_work_t* work, int status)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						blobs_get_t* get = work->data;
 | 
				
			||||||
 | 
						if (status != 0)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							tf_printf("uv_queue_work failed: %s\n", uv_strerror(status));
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if (get->done)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							tf_free(get);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
 | 
					static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
 | 
						tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
 | 
				
			||||||
@@ -438,8 +462,17 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne
 | 
				
			|||||||
		bool stored = false;
 | 
							bool stored = false;
 | 
				
			||||||
		if (JS_ToBool(context, args))
 | 
							if (JS_ToBool(context, args))
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			char id[256];
 | 
								get->work.data = get;
 | 
				
			||||||
			stored = tf_ssb_db_blob_store(ssb, (uint8_t*)get->buffer, get->expected_size, id, sizeof(id), NULL);
 | 
								int r = uv_queue_work(tf_ssb_get_loop(ssb), &get->work, _tf_ssb_rpc_blob_store_work, _tf_ssb_rpc_blob_store_after_work);
 | 
				
			||||||
 | 
								if (r)
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									tf_printf("uv_queue_work failed: %s\n", uv_strerror(r));
 | 
				
			||||||
 | 
									get->work.data = NULL;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								else
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									stored = true;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		tf_ssb_connection_rpc_send(
 | 
							tf_ssb_connection_rpc_send(
 | 
				
			||||||
			connection,
 | 
								connection,
 | 
				
			||||||
@@ -455,13 +488,18 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data)
 | 
					static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	tf_free(user_data);
 | 
						blobs_get_t* get = user_data;
 | 
				
			||||||
 | 
						get->done = true;
 | 
				
			||||||
 | 
						if (!get->work.data)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							tf_free(get);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
 | 
					static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
 | 
						blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
 | 
				
			||||||
	*get = (blobs_get_t) { .expected_size = size };
 | 
						*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size };
 | 
				
			||||||
	snprintf(get->id, sizeof(get->id), "%s", blob_id);
 | 
						snprintf(get->id, sizeof(get->id), "%s", blob_id);
 | 
				
			||||||
	memset(get->buffer, 0, size);
 | 
						memset(get->buffer, 0, size);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user