ssb: Attempt to request more feeds as more contact messages come in. #83
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Build Tilde Friends / Build-All (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Build Tilde Friends / Build-All (push) Has been cancelled
				
			This commit is contained in:
		| @@ -816,8 +816,7 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con | |||||||
| 	const int k_max = 32; | 	const int k_max = 32; | ||||||
| 	if (sqlite3_prepare(db, | 	if (sqlite3_prepare(db, | ||||||
| 			"SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE author = ?1 AND sequence > ?2 AND " | 			"SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE author = ?1 AND sequence > ?2 AND " | ||||||
| 			"sequence " | 			"sequence < ?3 ORDER BY sequence", | ||||||
| 			"< ?3 ORDER BY sequence", |  | ||||||
| 			-1, &statement, NULL) == SQLITE_OK) | 			-1, &statement, NULL) == SQLITE_OK) | ||||||
| 	{ | 	{ | ||||||
| 		if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK && | 		if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK && | ||||||
| @@ -828,7 +827,8 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con | |||||||
| 			JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); | 			JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); | ||||||
| 			JSContext* context = JS_NewContext(runtime); | 			JSContext* context = JS_NewContext(runtime); | ||||||
|  |  | ||||||
| 			while (sqlite3_step(statement) == SQLITE_ROW) | 			int r = SQLITE_OK; | ||||||
|  | 			while ((r = sqlite3_step(statement)) == SQLITE_ROW) | ||||||
| 			{ | 			{ | ||||||
| 				JSValue message = JS_UNDEFINED; | 				JSValue message = JS_UNDEFINED; | ||||||
| 				request->out_max_sequence_seen = sqlite3_column_int64(statement, 3); | 				request->out_max_sequence_seen = sqlite3_column_int64(statement, 3); | ||||||
| @@ -1173,6 +1173,20 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi | |||||||
| 	tf_ssb_connection_adjust_read_backpressure(connection, -1); | 	tf_ssb_connection_adjust_read_backpressure(connection, -1); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | typedef struct _resend_clock_t | ||||||
|  | { | ||||||
|  | 	tf_ssb_connection_t* connection; | ||||||
|  | 	int32_t request_number; | ||||||
|  | } resend_clock_t; | ||||||
|  |  | ||||||
|  | static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, void* user_data) | ||||||
|  | { | ||||||
|  | 	resend_clock_t* resend = user_data; | ||||||
|  | 	_tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED); | ||||||
|  | 	tf_ssb_connection_set_sent_clock(resend->connection, true); | ||||||
|  | 	tf_free(user_data); | ||||||
|  | } | ||||||
|  |  | ||||||
| static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) | static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) | ||||||
| { | { | ||||||
| 	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); | 	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); | ||||||
| @@ -1198,6 +1212,17 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f | |||||||
| 		/* Looks like a message. */ | 		/* Looks like a message. */ | ||||||
| 		tf_ssb_connection_adjust_read_backpressure(connection, 1); | 		tf_ssb_connection_adjust_read_backpressure(connection, 1); | ||||||
| 		tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); | 		tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); | ||||||
|  |  | ||||||
|  | 		if (tf_ssb_connection_get_sent_clock(connection)) | ||||||
|  | 		{ | ||||||
|  | 			tf_ssb_connection_set_sent_clock(connection, false); | ||||||
|  | 			resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); | ||||||
|  | 			*resend = (resend_clock_t) { | ||||||
|  | 				.connection = connection, | ||||||
|  | 				.request_number = request_number, | ||||||
|  | 			}; | ||||||
|  | 			tf_ssb_connection_schedule_idle(connection, _tf_ssb_rpc_ebt_replicate_resend_clock, resend); | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	else | 	else | ||||||
| 	{ | 	{ | ||||||
|   | |||||||
							
								
								
									
										183
									
								
								src/ssb.tests.c
									
									
									
									
									
								
							
							
						
						
									
										183
									
								
								src/ssb.tests.c
									
									
									
									
									
								
							| @@ -16,6 +16,8 @@ | |||||||
| #include <time.h> | #include <time.h> | ||||||
| #include <unistd.h> | #include <unistd.h> | ||||||
|  |  | ||||||
|  | #include "sodium/crypto_sign.h" | ||||||
|  |  | ||||||
| #if !defined(_WIN32) | #if !defined(_WIN32) | ||||||
| #include <sys/wait.h> | #include <sys/wait.h> | ||||||
| #endif | #endif | ||||||
| @@ -1039,4 +1041,185 @@ void tf_ssb_test_publish(const tf_test_options_t* options) | |||||||
| 	uv_loop_close(&loop); | 	uv_loop_close(&loop); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static void _test_print_identity(const char* identity, void* user_data) | ||||||
|  | { | ||||||
|  | 	tf_ssb_t* ssb = user_data; | ||||||
|  | 	int64_t sequence = -1; | ||||||
|  | 	char id[k_id_base64_len] = { 0 }; | ||||||
|  | 	snprintf(id, sizeof(id), "@%s", identity); | ||||||
|  | 	tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); | ||||||
|  | 	tf_printf("IDENTITY %s: %d\n", id, (int)sequence); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void tf_ssb_test_replicate(const tf_test_options_t* options) | ||||||
|  | { | ||||||
|  | 	tf_printf("Testing replication.\n"); | ||||||
|  |  | ||||||
|  | 	uv_loop_t loop = { 0 }; | ||||||
|  | 	uv_loop_init(&loop); | ||||||
|  |  | ||||||
|  | 	unlink("out/test_db0.sqlite"); | ||||||
|  | 	tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite", NULL); | ||||||
|  | 	tf_ssb_register(tf_ssb_get_context(ssb0), ssb0); | ||||||
|  | 	unlink("out/test_db1.sqlite"); | ||||||
|  | 	tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, "file:out/test_db1.sqlite", NULL); | ||||||
|  | 	tf_ssb_register(tf_ssb_get_context(ssb1), ssb1); | ||||||
|  |  | ||||||
|  | 	uv_idle_t idle0 = { .data = ssb0 }; | ||||||
|  | 	uv_idle_init(&loop, &idle0); | ||||||
|  | 	uv_idle_start(&idle0, _ssb_test_idle); | ||||||
|  |  | ||||||
|  | 	uv_idle_t idle1 = { .data = ssb1 }; | ||||||
|  | 	uv_idle_init(&loop, &idle1); | ||||||
|  | 	uv_idle_start(&idle1, _ssb_test_idle); | ||||||
|  |  | ||||||
|  | 	test_t test = { | ||||||
|  | 		.ssb0 = ssb0, | ||||||
|  | 		.ssb1 = ssb1, | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test); | ||||||
|  | 	tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test); | ||||||
|  |  | ||||||
|  | 	tf_ssb_generate_keys(ssb0); | ||||||
|  | 	tf_ssb_generate_keys(ssb1); | ||||||
|  |  | ||||||
|  | 	uint8_t priv0[crypto_sign_SECRETKEYBYTES] = { 0 }; | ||||||
|  | 	uint8_t priv1[crypto_sign_SECRETKEYBYTES] = { 0 }; | ||||||
|  | 	tf_ssb_get_private_key(ssb0, priv0, sizeof(priv0)); | ||||||
|  | 	tf_ssb_get_private_key(ssb1, priv1, sizeof(priv1)); | ||||||
|  |  | ||||||
|  | 	char id0[k_id_base64_len] = { 0 }; | ||||||
|  | 	char id1[k_id_base64_len] = { 0 }; | ||||||
|  | 	bool b = tf_ssb_whoami(ssb0, id0, sizeof(id0)); | ||||||
|  | 	(void)b; | ||||||
|  | 	assert(b); | ||||||
|  | 	b = tf_ssb_whoami(ssb1, id1, sizeof(id1)); | ||||||
|  | 	assert(b); | ||||||
|  | 	tf_printf("ID %s and %s\n", id0, id1); | ||||||
|  |  | ||||||
|  | 	char priv0_str[512] = { 0 }; | ||||||
|  | 	char priv1_str[512] = { 0 }; | ||||||
|  | 	tf_base64_encode(priv0, sizeof(priv0), priv0_str, sizeof(priv0_str)); | ||||||
|  | 	tf_base64_encode(priv1, sizeof(priv0), priv1_str, sizeof(priv1_str)); | ||||||
|  | 	tf_ssb_db_identity_add(ssb0, "test", id0 + 1, priv0_str); | ||||||
|  | 	tf_ssb_db_identity_add(ssb1, "test", id1 + 1, priv1_str); | ||||||
|  |  | ||||||
|  | 	static const int k_key_count = 5; | ||||||
|  | 	char public[k_key_count][k_id_base64_len - 1]; | ||||||
|  | 	char private[k_key_count][512]; | ||||||
|  | 	for (int i = 0; i < k_key_count; i++) | ||||||
|  | 	{ | ||||||
|  | 		tf_ssb_generate_keys_buffer(public[i], sizeof(public[i]), private[i], sizeof(private[i])); | ||||||
|  | 		bool added = tf_ssb_db_identity_add(ssb0, "test", public[i], private[i]); | ||||||
|  | 		tf_printf("%s user %d = %s private=%s\n", added ? "added" : "failed", i, public[i], private[i]); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	JSContext* context0 = tf_ssb_get_context(ssb0); | ||||||
|  | 	for (int i = 0; i < k_key_count - 1; i++) | ||||||
|  | 	{ | ||||||
|  | 		JSValue obj = JS_NewObject(context0); | ||||||
|  | 		JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "contact")); | ||||||
|  | 		char self[k_id_base64_len]; | ||||||
|  | 		snprintf(self, sizeof(self), "@%s", public[i]); | ||||||
|  | 		char contact[k_id_base64_len]; | ||||||
|  | 		snprintf(contact, sizeof(contact), "@%s", public[i + 1]); | ||||||
|  | 		JS_SetPropertyStr(context0, obj, "contact", JS_NewString(context0, contact)); | ||||||
|  | 		JS_SetPropertyStr(context0, obj, "following", JS_TRUE); | ||||||
|  | 		bool stored = false; | ||||||
|  | 		uint8_t private_bin[512] = { 0 }; | ||||||
|  | 		tf_base64_decode(private[i], strlen(private[i]) - strlen(".ed25519"), private_bin, sizeof(private_bin)); | ||||||
|  | 		tf_printf("ssb0 %s following %s\n", self, contact); | ||||||
|  | 		JSValue signed_message = tf_ssb_sign_message(ssb0, self, private_bin, obj, NULL, 0); | ||||||
|  | 		tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored); | ||||||
|  | 		JS_FreeValue(context0, signed_message); | ||||||
|  | 		_wait_stored(ssb0, &stored); | ||||||
|  | 		JS_FreeValue(context0, obj); | ||||||
|  |  | ||||||
|  | 		obj = JS_NewObject(context0); | ||||||
|  | 		JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "post")); | ||||||
|  | 		JS_SetPropertyStr(context0, obj, "text", JS_NewString(context0, "Hello, world!")); | ||||||
|  | 		stored = false; | ||||||
|  | 		signed_message = tf_ssb_sign_message(ssb0, self, private_bin, obj, NULL, 0); | ||||||
|  | 		tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored); | ||||||
|  | 		JS_FreeValue(context0, signed_message); | ||||||
|  | 		_wait_stored(ssb0, &stored); | ||||||
|  | 		JS_FreeValue(context0, obj); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	JSContext* context1 = tf_ssb_get_context(ssb1); | ||||||
|  | 	{ | ||||||
|  | 		JSValue obj = JS_NewObject(context1); | ||||||
|  | 		JS_SetPropertyStr(context1, obj, "type", JS_NewString(context1, "contact")); | ||||||
|  | 		char self[k_id_base64_len]; | ||||||
|  | 		snprintf(self, sizeof(self), "%s", id1); | ||||||
|  | 		char contact[k_id_base64_len]; | ||||||
|  | 		snprintf(contact, sizeof(contact), "@%s", public[0]); | ||||||
|  | 		JS_SetPropertyStr(context1, obj, "contact", JS_NewString(context1, contact)); | ||||||
|  | 		JS_SetPropertyStr(context1, obj, "following", JS_TRUE); | ||||||
|  | 		bool stored = false; | ||||||
|  | 		tf_printf("ssb1 %s following %s\n", self, contact); | ||||||
|  | 		JSValue signed_message = tf_ssb_sign_message(ssb1, self, priv1, obj, NULL, 0); | ||||||
|  | 		tf_ssb_verify_strip_and_store_message(ssb1, signed_message, _message_stored, &stored); | ||||||
|  | 		JS_FreeValue(context1, signed_message); | ||||||
|  | 		_wait_stored(ssb1, &stored); | ||||||
|  | 		JS_FreeValue(context1, obj); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	tf_printf("ssb0\n"); | ||||||
|  | 	tf_ssb_db_identity_visit_all(ssb0, _test_print_identity, ssb0); | ||||||
|  | 	tf_printf("ssb1\n"); | ||||||
|  | 	tf_ssb_db_identity_visit_all(ssb1, _test_print_identity, ssb1); | ||||||
|  |  | ||||||
|  | 	tf_ssb_server_open(ssb0, 12347); | ||||||
|  |  | ||||||
|  | 	uint8_t id0bin[k_id_bin_len]; | ||||||
|  | 	tf_ssb_id_str_to_bin(id0bin, id0); | ||||||
|  | 	tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL); | ||||||
|  |  | ||||||
|  | 	tf_printf("Waiting for connection.\n"); | ||||||
|  | 	while (test.connection_count0 != 1 || test.connection_count1 != 1) | ||||||
|  | 	{ | ||||||
|  | 		tf_ssb_set_main_thread(ssb0, true); | ||||||
|  | 		tf_ssb_set_main_thread(ssb1, true); | ||||||
|  | 		uv_run(&loop, UV_RUN_ONCE); | ||||||
|  | 		tf_ssb_set_main_thread(ssb0, false); | ||||||
|  | 		tf_ssb_set_main_thread(ssb1, false); | ||||||
|  | 	} | ||||||
|  | 	tf_ssb_server_close(ssb0); | ||||||
|  |  | ||||||
|  | 	int count1 = 0; | ||||||
|  | 	tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count1); | ||||||
|  | 	tf_printf("Waiting for message from other.\n"); | ||||||
|  | 	while (count1 != 4) | ||||||
|  | 	{ | ||||||
|  | 		tf_ssb_set_main_thread(ssb1, true); | ||||||
|  | 		uv_run(&loop, UV_RUN_ONCE); | ||||||
|  | 		tf_ssb_set_main_thread(ssb1, false); | ||||||
|  | 	} | ||||||
|  | 	tf_ssb_remove_message_added_callback(ssb1, _message_added, &count1); | ||||||
|  | 	tf_printf("done\n"); | ||||||
|  |  | ||||||
|  | 	tf_ssb_send_close(ssb1); | ||||||
|  |  | ||||||
|  | 	uv_close((uv_handle_t*)&idle0, NULL); | ||||||
|  | 	uv_close((uv_handle_t*)&idle1, NULL); | ||||||
|  |  | ||||||
|  | 	tf_printf("final run\n"); | ||||||
|  | 	tf_ssb_set_main_thread(ssb0, true); | ||||||
|  | 	tf_ssb_set_main_thread(ssb1, true); | ||||||
|  | 	uv_run(&loop, UV_RUN_DEFAULT); | ||||||
|  | 	tf_ssb_set_main_thread(ssb0, false); | ||||||
|  | 	tf_ssb_set_main_thread(ssb1, false); | ||||||
|  | 	tf_printf("done\n"); | ||||||
|  |  | ||||||
|  | 	tf_printf("destroy 0\n"); | ||||||
|  | 	tf_ssb_destroy(ssb0); | ||||||
|  | 	tf_printf("destroy 1\n"); | ||||||
|  | 	tf_ssb_destroy(ssb1); | ||||||
|  |  | ||||||
|  | 	tf_printf("close\n"); | ||||||
|  | 	uv_loop_close(&loop); | ||||||
|  | } | ||||||
|  |  | ||||||
| #endif | #endif | ||||||
|   | |||||||
| @@ -65,4 +65,10 @@ void tf_ssb_test_peer_exchange(const tf_test_options_t* options); | |||||||
| */ | */ | ||||||
| void tf_ssb_test_publish(const tf_test_options_t* options); | void tf_ssb_test_publish(const tf_test_options_t* options); | ||||||
|  |  | ||||||
|  | /** | ||||||
|  | ** Test replication. | ||||||
|  | ** @param options The test options. | ||||||
|  | */ | ||||||
|  | void tf_ssb_test_replicate(const tf_test_options_t* options); | ||||||
|  |  | ||||||
| /** @} */ | /** @} */ | ||||||
|   | |||||||
| @@ -1062,6 +1062,7 @@ void tf_tests(const tf_test_options_t* options) | |||||||
| 	_tf_test_run(options, "encrypt", tf_ssb_test_encrypt, false); | 	_tf_test_run(options, "encrypt", tf_ssb_test_encrypt, false); | ||||||
| 	_tf_test_run(options, "peer_exchange", tf_ssb_test_peer_exchange, false); | 	_tf_test_run(options, "peer_exchange", tf_ssb_test_peer_exchange, false); | ||||||
| 	_tf_test_run(options, "publish", tf_ssb_test_publish, false); | 	_tf_test_run(options, "publish", tf_ssb_test_publish, false); | ||||||
|  | 	_tf_test_run(options, "replicate", tf_ssb_test_replicate, false); | ||||||
| 	tf_printf("Tests completed.\n"); | 	tf_printf("Tests completed.\n"); | ||||||
| #endif | #endif | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user