246 lines
5.2 KiB
JavaScript

import * as core from './core.js';
let g_next_id = 1;
let g_calls = {};
let gSessionIndex = 0;
/**
* TODOC
* @returns
*/
function makeSessionId() {
return (gSessionIndex++).toString();
}
/**
* TODOC
* @returns
*/
function App() {
this._on_output = null;
this._send_queue = [];
return this;
}
/**
* TODOC
* @param {*} callback
*/
App.prototype.readOutput = function (callback) {
this._on_output = callback;
};
/**
* TODOC
* @param {*} api
* @returns
*/
App.prototype.makeFunction = function (api) {
let self = this;
let result = function () {
let id = g_next_id++;
while (!id || g_calls[id]) {
id = g_next_id++;
}
let promise = new Promise(function (resolve, reject) {
g_calls[id] = {resolve: resolve, reject: reject};
});
let message = {
message: 'tfrpc',
method: api[0],
params: [...arguments],
id: id,
};
self.send(message);
return promise;
};
Object.defineProperty(result, 'name', {value: api[0], writable: false});
return result;
};
/**
* TODOC
* @param {*} message
*/
App.prototype.send = function (message) {
if (this._send_queue) {
if (this._on_output) {
this._send_queue.forEach((x) => this._on_output(x));
this._send_queue = null;
} else if (message) {
this._send_queue.push(message);
}
}
if (message && this._on_output) {
this._on_output(message);
}
};
/**
* TODOC
* @param {*} request
* @param {*} response
* @param {*} client
*/
function socket(request, response, client) {
let process;
let options = {};
let credentials = httpd.auth_query(request.headers);
response.onClose = async function () {
if (process && process.task) {
process.task.kill();
}
if (process) {
process.timeout = 0;
}
};
response.onMessage = async function (event) {
if (event.opCode == 0x1 || event.opCode == 0x2) {
let message;
try {
message = JSON.parse(event.data);
} catch (error) {
print('ERROR', error, event.data, event.data.length, event.opCode);
return;
}
if (message.action == 'hello') {
let packageOwner;
let packageName;
let blobId;
let match;
let parentApp;
if (
(match = /^\/([&%][^\.]{44}(?:\.\w+)?)(\/?.*)/.exec(message.path))
) {
blobId = match[1];
} else if ((match = /^\/\~([^\/]+)\/([^\/]+)\/$/.exec(message.path))) {
packageOwner = match[1];
packageName = match[2];
blobId = await new Database(packageOwner).get('path:' + packageName);
if (!blobId) {
response.send(
JSON.stringify({
message: 'tfrpc',
method: 'error',
params: [message.path + ' not found'],
id: -1,
}),
0x1
);
return;
}
if (packageOwner != 'core') {
let coreId = await new Database('core').get('path:' + packageName);
parentApp = {
path: '/~core/' + packageName + '/',
id: coreId,
};
}
}
response.send(
JSON.stringify({
action: 'session',
credentials: credentials,
parentApp: parentApp,
id: blobId,
}),
0x1
);
options.api = message.api || [];
options.credentials = credentials;
options.packageOwner = packageOwner;
options.packageName = packageName;
options.url = message.url;
let sessionId = makeSessionId();
if (blobId) {
if (message.edit_only) {
response.send(
JSON.stringify({action: 'ready', edit_only: true}),
0x1
);
} else {
process = await core.getSessionProcessBlob(
blobId,
sessionId,
options
);
}
}
if (process) {
process.app.readOutput(function (message) {
response.send(JSON.stringify(message), 0x1);
});
process.app.send();
}
let ping = function () {
let now = Date.now();
let again = true;
if (now - process.lastActive < process.timeout) {
// Active.
} else if (process.lastPing > process.lastActive) {
// We lost them.
if (process.task) {
process.task.kill();
}
again = false;
} else {
// Idle. Ping them.
response.send('', 0x9);
process.lastPing = now;
}
if (again && process.timeout) {
setTimeout(ping, process.timeout);
}
};
if (process && process.timeout > 0) {
setTimeout(ping, process.timeout);
}
} else if (message.action == 'enableStats') {
if (process) {
core.enableStats(process, message.enabled);
}
} else if (message.action == 'resetPermission') {
if (process) {
process.resetPermission(message.permission);
}
} else if (message.message == 'tfrpc') {
if (message.id && g_calls[message.id]) {
if (message.error !== undefined) {
g_calls[message.id].reject(message.error);
} else {
g_calls[message.id].resolve(message.result);
}
delete g_calls[message.id];
}
} else {
if (process && process.eventHandlers['message']) {
await core.invoke(process.eventHandlers['message'], [message]);
}
}
} else if (event.opCode == 0x8) {
// Close.
if (process && process.task) {
process.task.kill();
}
response.send(event.data, 0x8);
} else if (event.opCode == 0xa) {
// PONG
}
if (process) {
process.lastActive = Date.now();
}
};
response.upgrade(100, {});
}
export {socket, App};