import * as auth from './auth.js'; import * as core from './core.js'; let g_next_id = 1; let g_calls = {}; let gSessionIndex = 0; /** * TODOC * @returns */ function makeSessionId() { return (gSessionIndex++).toString(); } /** * TODOC * @returns */ function App() { this._on_output = null; this._send_queue = []; return this; } /** * TODOC * @param {*} callback */ App.prototype.readOutput = function (callback) { this._on_output = callback; }; /** * TODOC * @param {*} api * @returns */ App.prototype.makeFunction = function (api) { let self = this; let result = function () { let id = g_next_id++; while (!id || g_calls[id]) { id = g_next_id++; } let promise = new Promise(function (resolve, reject) { g_calls[id] = {resolve: resolve, reject: reject}; }); let message = { message: 'tfrpc', method: api[0], params: [...arguments], id: id, }; self.send(message); return promise; }; Object.defineProperty(result, 'name', {value: api[0], writable: false}); return result; }; /** * TODOC * @param {*} message */ App.prototype.send = function (message) { if (this._send_queue) { if (this._on_output) { this._send_queue.forEach((x) => this._on_output(x)); this._send_queue = null; } else if (message) { this._send_queue.push(message); } } if (message && this._on_output) { this._on_output(message); } }; /** * TODOC * @param {*} request * @param {*} response * @param {*} client */ function socket(request, response, client) { let process; let options = {}; let credentials = auth.query(request.headers); response.onClose = async function () { if (process && process.task) { process.task.kill(); } if (process) { process.timeout = 0; } }; response.onMessage = async function (event) { if (event.opCode == 0x1 || event.opCode == 0x2) { let message; try { message = JSON.parse(event.data); } catch (error) { print('ERROR', error, event.data, event.data.length, event.opCode); return; } if (message.action == 'hello') { let packageOwner; let packageName; let blobId; let match; let parentApp; if ( (match = /^\/([&%][^\.]{44}(?:\.\w+)?)(\/?.*)/.exec(message.path)) ) { blobId = match[1]; } else if ((match = /^\/\~([^\/]+)\/([^\/]+)\/$/.exec(message.path))) { packageOwner = match[1]; packageName = match[2]; blobId = await new Database(packageOwner).get('path:' + packageName); if (!blobId) { response.send( JSON.stringify({ message: 'tfrpc', method: 'error', params: [message.path + ' not found'], id: -1, }), 0x1 ); return; } if (packageOwner != 'core') { let coreId = await new Database('core').get('path:' + packageName); parentApp = { path: '/~core/' + packageName + '/', id: coreId, }; } } response.send( JSON.stringify({ action: 'session', credentials: credentials, parentApp: parentApp, id: blobId, }), 0x1 ); options.api = message.api || []; options.credentials = credentials; options.packageOwner = packageOwner; options.packageName = packageName; options.url = message.url; let sessionId = makeSessionId(); if (blobId) { if (message.edit_only) { response.send( JSON.stringify({action: 'ready', edit_only: true}), 0x1 ); } else { process = await core.getSessionProcessBlob( blobId, sessionId, options ); } } if (process) { process.app.readOutput(function (message) { response.send(JSON.stringify(message), 0x1); }); process.app.send(); } let ping = function () { let now = Date.now(); let again = true; if (now - process.lastActive < process.timeout) { // Active. } else if (process.lastPing > process.lastActive) { // We lost them. if (process.task) { process.task.kill(); } again = false; } else { // Idle. Ping them. response.send('', 0x9); process.lastPing = now; } if (again && process.timeout) { setTimeout(ping, process.timeout); } }; if (process && process.timeout > 0) { setTimeout(ping, process.timeout); } } else if (message.action == 'enableStats') { if (process) { core.enableStats(process, message.enabled); } } else if (message.action == 'resetPermission') { if (process) { process.resetPermission(message.permission); } } else if (message.message == 'tfrpc') { if (message.id && g_calls[message.id]) { if (message.error !== undefined) { g_calls[message.id].reject(message.error); } else { g_calls[message.id].resolve(message.result); } delete g_calls[message.id]; } } else { if (process && process.eventHandlers['message']) { await core.invoke(process.eventHandlers['message'], [message]); } } } else if (event.opCode == 0x8) { // Close. if (process && process.task) { process.task.kill(); } response.send(event.data, 0x8); } else if (event.opCode == 0xa) { // PONG } if (process) { process.lastActive = Date.now(); } }; response.upgrade(100, {}); } export {socket, App};