Working toward less aggressive blob and feed fetching.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3710 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
		
							
								
								
									
										45
									
								
								core/ssb.js
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								core/ssb.js
									
									
									
									
									
								
							| @@ -1,6 +1,8 @@ | ||||
| "use strict"; | ||||
| var g_wants_requests = {}; | ||||
| var g_database = new Database('core'); | ||||
| const k_use_create_history_stream = false; | ||||
| const k_blobs_concurrent_target = 8; | ||||
|  | ||||
| function following(db, id) { | ||||
| 	var o = db.get(id + ":following"); | ||||
| @@ -90,9 +92,12 @@ ssb.addEventListener('connections', function(change, connection) { | ||||
| 				connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		connection.active_blob_wants = {}; | ||||
| 		connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { | ||||
| 			Object.keys(message.message).forEach(function(id) { | ||||
| 				if (message.message[id] < 0) { | ||||
| 					delete connection.active_blob_wants[id]; | ||||
| 					var blob = ssb.blobGet(id); | ||||
| 					if (blob) { | ||||
| 						var out_message = {}; | ||||
| @@ -104,12 +109,19 @@ ssb.addEventListener('connections', function(change, connection) { | ||||
| 					var expected_bytes = message.message[id]; | ||||
| 					var buffer = new Uint8Array(expected_bytes); | ||||
| 					connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) { | ||||
| 						if (message.flags & 0x4 /* end */) { | ||||
| 							delete connection.active_blob_wants[id]; | ||||
| 						} else { | ||||
| 							buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes); | ||||
| 							received_bytes += message.message.byteLength; | ||||
| 							if (received_bytes == expected_bytes) { | ||||
| 								ssb.blobStore(buffer); | ||||
| 							} | ||||
| 						} | ||||
| 					}); | ||||
| 					if (Object.keys(connection.active_blob_wants).length < k_blobs_concurrent_target) { | ||||
| 						requestMoreBlobs(g_wants_requests[connection.id]); | ||||
| 					} | ||||
| 				} | ||||
| 			}); | ||||
| 		}); | ||||
| @@ -121,18 +133,27 @@ ssb.addEventListener('connections', function(change, connection) { | ||||
| 	} | ||||
| }); | ||||
|  | ||||
| ssb.addRpc(['blobs', 'createWants'], function(request) { | ||||
| 	g_wants_requests[request.connection.id] = request; | ||||
| 	function blob_want_discovered(id) { | ||||
| function blob_want_discovered(request, id) { | ||||
| 	if (Object.keys(request.connection.active_blob_wants).length > k_blobs_concurrent_target) { | ||||
| 		return; | ||||
| 	} | ||||
| 	var message = {}; | ||||
| 	message[id] = -1; | ||||
| 	request.send_json(message); | ||||
| 	request.connection.active_blob_wants[id] = true; | ||||
| } | ||||
| 	ssb.addEventListener('blob_want_added', blob_want_discovered); | ||||
|  | ||||
| function requestMoreBlobs(request) { | ||||
| 	ssb.sqlStream( | ||||
| 		'SELECT id FROM blob_wants', | ||||
| 		'SELECT id FROM blob_wants LIMIT ' + k_blobs_concurrent_target, | ||||
| 		[], | ||||
| 		row => blob_want_discovered(row.id)); | ||||
| 		row => blob_want_discovered(request, row.id)); | ||||
| } | ||||
|  | ||||
| ssb.addRpc(['blobs', 'createWants'], function(request) { | ||||
| 	g_wants_requests[request.connection.id] = request; | ||||
| 	ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id)); | ||||
| 	requestMoreBlobs(request); | ||||
| }); | ||||
|  | ||||
| ssb.addRpc(['blobs', 'has'], function(request) { | ||||
| @@ -166,10 +187,16 @@ ssb.addRpc(['tunnel', 'isRoom'], function(request) { | ||||
| function ebtReplicateSendClock(request, have) { | ||||
| 	var me = ssb.whoami(); | ||||
| 	var message = {}; | ||||
| 	var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]).concat(Object.keys(have)); | ||||
| 	var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]); | ||||
| 	for (let id of ids) { | ||||
| 		message[id] = get_latest_sequence_for_author(id); | ||||
| 	} | ||||
| 	for (let id of Object.keys(have)) { | ||||
| 		if (message[id] === undefined) { | ||||
| 			var sequence = get_latest_sequence_for_author(id); | ||||
| 			message[id] = sequence ? sequence : -1; | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	var last_sent = request.connection.sent_clock || {}; | ||||
| 	var to_send = {} | ||||
| @@ -177,6 +204,10 @@ function ebtReplicateSendClock(request, have) { | ||||
| 		if (last_sent[id] === undefined || message[id] > last_sent[id]) { | ||||
| 			last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1; | ||||
| 		} | ||||
| 		if (Object.keys(to_send).length >= 32) { | ||||
| 			request.send_json(to_send); | ||||
| 			to_send = {}; | ||||
| 		} | ||||
| 	} | ||||
| 	request.connection.sent_clock = last_sent; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user