Now with more WebSockets.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3197 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
2016-04-11 00:09:21 +00:00
parent e6aad326c5
commit 2c8bea27a0
5 changed files with 510 additions and 246 deletions

View File

@@ -14,9 +14,9 @@ var auth = require('auth');
var form = require('form');
function Terminal() {
this._waiting = [];
this._index = 0;
this._firstLine = 0;
this._sentIndex = -1;
this._lines = [];
this._lastRead = null;
this._lastWrite = null;
@@ -24,29 +24,31 @@ function Terminal() {
this._readLine = null;
this._selected = null;
this._corked = 0;
this._onOutput = null;
return this;
}
Terminal.kBacklog = 64;
Terminal.prototype.dispatch = function(data) {
for (var i in this._waiting) {
this.feedWaiting(this._waiting[i], data);
}
this._waiting.length = 0;
Terminal.prototype.readOutput = function(callback) {
this._onOutput = callback;
this.dispatch();
}
Terminal.prototype.feedWaiting = function(waiting, data) {
var terminal = this;
var payload = terminal._lines.slice(Math.max(0, waiting.haveIndex + 1 - terminal._firstLine));
Terminal.prototype.dispatch = function(data) {
var payload = this._lines.slice(Math.max(0, this._sentIndex + 1 - this._firstLine));
if (data) {
payload.push(data);
}
if (waiting.haveIndex < terminal._index - 1 || data) {
waiting.resolve({index: terminal._index - 1, lines: payload});
if (this._onOutput && (this._sentIndex < this._index - 1 || data)) {
this._sentIndex = this._index - 1;
this._onOutput({lines: payload});
}
}
Terminal.prototype.feedWaiting = function(waiting, data) {
}
Terminal.prototype.print = function() {
var data = arguments;
if (this._selected) {
@@ -104,8 +106,6 @@ Terminal.prototype.postMessageToIframe = function(name, message) {
}
Terminal.prototype.clear = function() {
//this._lines.length = 0;
//this._firstLine = this._index;
this.print({action: "clear"});
}
@@ -113,18 +113,6 @@ Terminal.prototype.ping = function() {
this.dispatch({action: "ping"});
}
Terminal.prototype.getOutput = function(haveIndex) {
var terminal = this;
terminal._lastRead = new Date();
return new Promise(function(resolve) {
if (haveIndex < terminal._index - 1) {
resolve({index: terminal._index - 1, lines: terminal._lines.slice(Math.max(0, haveIndex + 1 - terminal._firstLine))});
} else {
terminal._waiting.push({haveIndex: haveIndex, resolve: resolve});
}
});
}
Terminal.prototype.setEcho = function(echo) {
this._echo = echo;
}
@@ -145,10 +133,7 @@ Terminal.prototype.cork = function() {
Terminal.prototype.uncork = function() {
if (--this._corked == 0) {
for (var i = 0; i < this._waiting.length; i++) {
this.feedWaiting(this._waiting[i]);
}
this._waiting.length = 0;
this.dispatch();
}
}
@@ -162,6 +147,97 @@ function invoke(handlers, argv) {
return Promise.all(promises);
}
function socket(request, response, client) {
var process;
var options = {};
var credentials = auth.query(request.headers);
if (credentials && credentials.session) {
options.userName = credentials.session.name;
}
options.credentials = credentials;
response.onMessage = function(event) {
if (event.opCode == 0x1 || event.opCode == 0x2) {
var message;
try {
message = JSON.parse(event.data);
} catch (error) {
print("ERROR", error, event.data, event.data.length, event.opCode);
return;
}
if (message.action == "hello") {
var packageOwner;
var packageName;
var match;
if (match = /^\/\~([^\/]+)\/([^\/]+)(.*)/.exec(message.path)) {
packageOwner = match[1];
packageName = match[2];
}
response.send(JSON.stringify({action: "hello"}), 0x1);
process = getSessionProcess(packageOwner, packageName, makeSessionId(), options);
process.terminal.readOutput(function(message) {
response.send(JSON.stringify(message), 0x1);
});
var ping = function() {
var now = Date.now();
var again = true;
if (now - process.lastActive < process.timeout) {
// Active.
} else if (process.lastPing > process.lastActive) {
// We lost them.
process.task.kill();
again = false;
} else {
// Idle. Ping them.
response.send("", 0x9);
process.lastPing = now;
}
if (again) {
setTimeout(ping, process.timeout);
}
}
if (process.timeout > 0) {
setTimeout(ping, process.timeout);
}
} else if (message.action == "command") {
var command = message.command;
var eventName = 'unknown';
if (typeof command == "string") {
if (process.terminal._echo) {
process.terminal.print("> " + command);
}
if (process.terminal._readLine) {
let promise = process.terminal._readLine;
process.terminal._readLine = null;
promise[0](command);
}
eventName = 'onInput';
} else if (command.event) {
eventName = command.event;
}
return invoke(process.eventHandlers[eventName], [command]).catch(function(error) {
process.terminal.print(error);
});
}
} else if (event.opCode == 0x8) {
// Close.
process.task.kill();
response.send(event.data, 0x8);
} else if (event.opCode == 0xa) {
// PONG
}
if (process) {
process.lastActive = Date.now();
}
}
}
function handler(request, response, packageOwner, packageName, uri) {
var found = false;
@@ -226,62 +302,9 @@ function handler(request, response, packageOwner, packageName, uri) {
response.end("Problem saving: " + packageName);
}
}
} else {
var options = {};
var credentials = auth.query(request.headers);
if (credentials && credentials.session) {
options.userName = credentials.session.name;
}
options.credentials = credentials;
if (uri == "/submit") {
process = getServiceProcess(packageOwner, packageName, "submit");
} else if (uri == "/atom") {
process = getServiceProcess(packageOwner, packageName, "atom");
} else {
var sessionId = form.decodeForm(request.query).sessionId;
var isNewSession = false;
if (!getSessionProcess(packageOwner, packageName, sessionId, {create: false})) {
sessionId = makeSessionId();
isNewSession = true;
}
process = getSessionProcess(packageOwner, packageName, sessionId, options);
}
process.lastActive = Date.now();
if (uri === "/send") {
if (isNewSession) {
response.writeHead(403, {"Content-Type": "text/plain; charset=utf-8"});
response.end("Too soon.");
} else {
var command = JSON.parse(request.body);
var eventName = 'unknown';
if (typeof command == "string") {
if (process.terminal._echo) {
process.terminal.print("> " + command);
}
if (process.terminal._readLine) {
let promise = process.terminal._readLine;
process.terminal._readLine = null;
promise[0](command);
}
eventName = 'onInput';
} else if (command.event) {
eventName = command.event;
}
return invoke(process.eventHandlers[eventName], [command]).then(function() {
response.writeHead(200, {
"Content-Type": "text/plain; charset=utf-8",
"Content-Length": "0",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
});
response.end("");
}).catch(function(error) {
process.terminal.print(error);
});
}
} else if (uri === "/submit") {
} else if (uri === "/submit") {
var process = getServiceProcess(packageOwner, packageName, "submit");
process.lastActive = Date.now();
return process.ready.then(function() {
var payload = form.decodeForm(request.body, form.decodeForm(request.query));
return invoke(process.eventHandlers['onSubmit'], [payload]).then(function() {
@@ -295,63 +318,26 @@ function handler(request, response, packageOwner, packageName, uri) {
return response.end("");
});
});
} else if (uri === "/atom") {
return process.ready.then(function() {
var payload = form.decodeForm(request.body, form.decodeForm(request.query));
return invoke(process.eventHandlers['onAtom'], [payload]).then(function(content) {
var atomContent = content.join();
response.writeHead(200, {
"Content-Type": "application/atom+xml; charset=utf-8",
"Content-Length": atomContent.length.toString(),
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
});
return response.end(atomContent);
});
});
} else if (uri === "/receive") {
if (isNewSession) {
var data = JSON.stringify({
lines: [
{
action: "session",
session: {
sessionId: sessionId,
credentials: credentials,
}
},
]
});
} else if (uri === "/atom") {
var process = getServiceProcess(packageOwner, packageName, "atom");
process.lastActive = Date.now();
return process.ready.then(function() {
var payload = form.decodeForm(request.body, form.decodeForm(request.query));
return invoke(process.eventHandlers['onAtom'], [payload]).then(function(content) {
var atomContent = content.join();
response.writeHead(200, {
"Content-Type": "text/plain; charset=utf-8",
"Content-Length": data.length.toString(),
"Content-Type": "application/atom+xml; charset=utf-8",
"Content-Length": atomContent.length.toString(),
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
});
process.ready.then(function() {
process.terminal.print({action: "ready", ready: true});
}).catch(function(error) {
process.terminal.print({action: "ready", error: error});
});
response.end(data);
} else {
return process.terminal.getOutput(parseInt(request.body)).then(function(output) {
var data = JSON.stringify(output);
response.writeHead(200, {
"Content-Type": "text/plain; charset=utf-8",
"Content-Length": data.length.toString(),
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
});
response.end(data);
});
}
}
return response.end(atomContent);
});
});
}
}
}
exports.handler = handler;
exports.socket = socket;