aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/adns.lua21
-rw-r--r--net/connlisteners.lua15
-rw-r--r--net/dns.lua33
-rw-r--r--net/http.lua97
-rw-r--r--net/http/codes.lua12
-rw-r--r--net/http/server.lua82
-rw-r--r--net/httpserver.lua11
-rw-r--r--net/server.lua2
-rw-r--r--net/server_event.lua1309
-rw-r--r--net/server_select.lua109
-rw-r--r--net/websocket.lua272
-rw-r--r--net/websocket/frames.lua219
12 files changed, 1363 insertions, 819 deletions
diff --git a/net/adns.lua b/net/adns.lua
index 3fc958f4..0b7247ed 100644
--- a/net/adns.lua
+++ b/net/adns.lua
@@ -1,7 +1,7 @@
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
@@ -11,14 +11,13 @@ local dns = require "net.dns";
local log = require "util.logger".init("adns");
-local t_insert, t_remove = table.insert, table.remove;
local coroutine, tostring, pcall = coroutine, tostring, pcall;
local function dummy_send(sock, data, i, j) return (j-i)+1; end
-module "adns"
+local _ENV = nil;
-function lookup(handler, qname, qtype, qclass)
+local function lookup(handler, qname, qtype, qclass)
return coroutine.wrap(function (peek)
if peek then
log("debug", "Records for %s already cached, using those...", qname);
@@ -43,12 +42,12 @@ function lookup(handler, qname, qtype, qclass)
end)(dns.peek(qname, qtype, qclass));
end
-function cancel(handle, call_handler, reason)
+local function cancel(handle, call_handler, reason)
log("warn", "Cancelling DNS lookup for %s", tostring(handle[3]));
dns.cancel(handle[1], handle[2], handle[3], handle[4], call_handler);
end
-function new_async_socket(sock, resolver)
+local function new_async_socket(sock, resolver)
local peername = "<unknown>";
local listener = {};
local handler = {};
@@ -65,7 +64,7 @@ function new_async_socket(sock, resolver)
if resolver.socketset[conn] == resolver.best_server and resolver.best_server == #servers then
log("error", "Exhausted all %d configured DNS servers, next lookup will try %s again", #servers, servers[1]);
end
-
+
resolver:servfail(conn); -- Let the magic commence
end
end
@@ -73,7 +72,7 @@ function new_async_socket(sock, resolver)
if not handler then
return nil, err;
end
-
+
handler.settimeout = function () end
handler.setsockname = function (_, ...) return sock:setsockname(...); end
handler.setpeername = function (_, ...) peername = (...); local ret, err = sock:setpeername(...); _:set_send(dummy_send); return ret, err; end
@@ -88,4 +87,8 @@ end
dns.socket_wrapper_set(new_async_socket);
-return _M;
+return {
+ lookup = lookup;
+ cancel = cancel;
+ new_async_socket = new_async_socket;
+};
diff --git a/net/connlisteners.lua b/net/connlisteners.lua
index 99ddc720..000bfa63 100644
--- a/net/connlisteners.lua
+++ b/net/connlisteners.lua
@@ -2,14 +2,17 @@
local log = require "util.logger".init("net.connlisteners");
local traceback = debug.traceback;
-module "httpserver"
+local _ENV = nil;
-function fail()
+local function fail()
log("error", "Attempt to use legacy connlisteners API. For more info see http://prosody.im/doc/developers/network");
log("error", "Legacy connlisteners API usage, %s", traceback("", 2));
end
-register, deregister = fail, fail;
-get, start = fail, fail, epic_fail;
-
-return _M;
+return {
+ register = fail;
+ register = fail;
+ get = fail;
+ start = fail;
+ -- epic fail
+};
diff --git a/net/dns.lua b/net/dns.lua
index d123731c..726b2b80 100644
--- a/net/dns.lua
+++ b/net/dns.lua
@@ -22,8 +22,8 @@ local is_windows = (_ and windows) or os.getenv("WINDIR");
local coroutine, io, math, string, table =
coroutine, io, math, string, table;
-local ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type=
- ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type;
+local ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type, unpack=
+ ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type, table.unpack or unpack;
local ztact = { -- public domain 20080404 lua@ztact.com
get = function(parent, ...)
@@ -71,8 +71,8 @@ local get, set = ztact.get, ztact.set;
local default_timeout = 15;
-------------------------------------------------- module dns
-module('dns')
-local dns = _M;
+local _ENV = nil;
+local dns = {};
-- dns type & class codes ------------------------------ dns type & class codes
@@ -190,7 +190,7 @@ end
local rrs_metatable = {}; -- - - - - - - - - - - - - - - - - - rrs_metatable
function rrs_metatable.__tostring(rrs)
local t = {};
- for i,rr in ipairs(rrs) do
+ for _, rr in ipairs(rrs) do
append(t, tostring(rr)..'\n');
end
return table.concat(t);
@@ -213,15 +213,6 @@ function cache_metatable.__tostring(cache)
end
-function resolver:new() -- - - - - - - - - - - - - - - - - - - - - resolver
- local r = { active = {}, cache = {}, unsorted = {} };
- setmetatable(r, resolver);
- setmetatable(r.cache, cache_metatable);
- setmetatable(r.unsorted, { __mode = 'kv' });
- return r;
-end
-
-
-- packet layer -------------------------------------------------- packet layer
@@ -629,7 +620,7 @@ function resolver:getsocket(servernum) -- - - - - - - - - - - - - getsocket
if peer:find(":") then
sock, err = socket.udp6();
else
- sock, err = socket.udp();
+ sock, err = (socket.udp4 or socket.udp)();
end
if sock and self.socket_wrapper then sock, err = self.socket_wrapper(sock, self); end
if not sock then
@@ -850,7 +841,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
rset = rset or self.socket;
local response;
- for i,sock in pairs(rset) do
+ for _, sock in pairs(rset) do
if self.socketset[sock] then
local packet = sock:receive();
@@ -861,7 +852,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
--print('received response');
--self.print(response);
- for j,rr in pairs(response.answer) do
+ for _, rr in pairs(response.answer) do
if rr.name:sub(-#response.question[1].name, -1) == response.question[1].name then
self:remember(rr, response.question[1].type)
end
@@ -903,7 +894,7 @@ function resolver:feed(sock, packet, force)
--print('received response');
--self.print(response);
- for j,rr in pairs(response.answer) do
+ for _, rr in pairs(response.answer) do
self:remember(rr, response.question[1].type);
end
@@ -1020,7 +1011,7 @@ end
function resolver.print(response) -- - - - - - - - - - - - - resolver.print
- for s,s in pairs { 'id', 'qr', 'opcode', 'aa', 'tc', 'rd', 'ra', 'z',
+ for _, s in pairs { 'id', 'qr', 'opcode', 'aa', 'tc', 'rd', 'ra', 'z',
'rcode', 'qdcount', 'ancount', 'nscount', 'arcount' } do
print( string.format('%-30s', 'header.'..s), response.header[s], hint(response.header, s) );
end
@@ -1033,7 +1024,7 @@ function resolver.print(response) -- - - - - - - - - - - - - resolver.print
local common = { name=1, type=1, class=1, ttl=1, rdlength=1, rdata=1 };
local tmp;
- for s,s in pairs({'answer', 'authority', 'additional'}) do
+ for _, s in pairs({'answer', 'authority', 'additional'}) do
for i,rr in pairs(response[s]) do
for j,t in pairs({ 'name', 'type', 'class', 'ttl', 'rdlength' }) do
tmp = string.format('%s[%i].%s', s, i, t);
@@ -1054,8 +1045,6 @@ end
function dns.resolver () -- - - - - - - - - - - - - - - - - - - - - resolver
- -- this function seems to be redundant with resolver.new ()
-
local r = { active = {}, cache = {}, unsorted = {}, wanted = {}, best_server = 1 };
setmetatable (r, resolver);
setmetatable (r.cache, cache_metatable);
diff --git a/net/http.lua b/net/http.lua
index 8ce47494..669fa9a5 100644
--- a/net/http.lua
+++ b/net/http.lua
@@ -1,12 +1,11 @@
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
-local socket = require "socket"
local b64 = require "util.encodings".base64.encode;
local url = require "socket.url"
local httpstream_new = require "net.http.parser".new;
@@ -24,10 +23,12 @@ local assert, error = assert, error
local log = require "util.logger".init("http");
-module "http"
+local _ENV = nil;
local requests = {}; -- Open requests
+local function make_id(req) return (tostring(req):match("%x+$")); end
+
local listener = { default_port = 80, default_mode = "*a" };
function listener.onconnect(conn)
@@ -37,7 +38,7 @@ function listener.onconnect(conn)
if req.query then
t_insert(request_line, 4, "?"..req.query);
end
-
+
conn:write(t_concat(request_line));
local t = { [2] = ": ", [4] = "\r\n" };
for k, v in pairs(req.headers) do
@@ -45,7 +46,7 @@ function listener.onconnect(conn)
conn:write(t_concat(t));
end
conn:write("\r\n");
-
+
if req.body then
conn:write(req.body);
end
@@ -76,6 +77,13 @@ function listener.ondetach(conn)
requests[conn] = nil;
end
+local function destroy_request(request)
+ if request.conn then
+ request.conn = nil;
+ request.handler:close()
+ end
+end
+
local function request_reader(request, data, err)
if not request.parser then
local function error_cb(reason)
@@ -85,12 +93,12 @@ local function request_reader(request, data, err)
end
destroy_request(request);
end
-
+
if not data then
error_cb(err);
return;
end
-
+
local function success_cb(r)
if request.callback then
request.callback(r.body, r.code, r, request);
@@ -107,20 +115,29 @@ local function request_reader(request, data, err)
end
local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end
-function request(u, ex, callback)
+local function log_if_failed(id, ret, ...)
+ if not ret then
+ log("error", "Request %s: error in callback: %s", id, tostring((...)));
+ end
+ return ...;
+end
+
+local function request(u, ex, callback)
local req = url.parse(u);
-
+
if not (req and req.host) then
callback(nil, 0, req);
return nil, "invalid-url";
end
-
+
if not req.path then
req.path = "/";
end
-
+
+ req.id = ex and ex.id or make_id(req);
+
local method, headers, body;
-
+
local host, port = req.host, req.port;
local host_header = host;
if (port == "80" and req.scheme == "http")
@@ -134,7 +151,7 @@ function request(u, ex, callback)
["Host"] = host_header;
["User-Agent"] = "Prosody XMPP Server";
};
-
+
if req.userinfo then
headers["Authorization"] = "Basic "..b64(req.userinfo);
end
@@ -154,34 +171,35 @@ function request(u, ex, callback)
end
end
end
-
+
+ log("debug", "Making %s %s request %s to %s", req.scheme, method or "GET", req.id, (ex and ex.suppress_url and host_header) or u);
+
-- Attach to request object
req.method, req.headers, req.body = method, headers, body;
-
+
local using_https = req.scheme == "https";
if using_https and not ssl_available then
error("SSL not available, unable to contact https URL");
end
local port_number = port and tonumber(port) or (using_https and 443 or 80);
-
- -- Connect the socket, and wrap it with net.server
- local conn = socket.tcp();
- conn:settimeout(10);
- local ok, err = conn:connect(host, port_number);
- if not ok and err ~= "timeout" then
- callback(nil, 0, req);
- return nil, err;
- end
-
+
local sslctx = false;
if using_https then
sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2", "no_sslv3" } };
end
- req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx));
+ local handler, conn = server.addclient(host, port_number, listener, "*a", sslctx)
+ if not handler then
+ callback(nil, 0, req);
+ return nil, conn;
+ end
+ req.handler, req.conn = handler, conn
req.write = function (...) return req.handler:write(...); end
-
- req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end
+
+ req.callback = function (content, code, request, response)
+ log("debug", "request %s: Calling callback, status %s", req.id, code or "---");
+ return log_if_failed(req.id, xpcall(function () return callback(content, code, request, response) end, handleerr));
+ end
req.reader = request_reader;
req.state = "status";
@@ -189,17 +207,12 @@ function request(u, ex, callback)
return req;
end
-function destroy_request(request)
- if request.conn then
- request.conn = nil;
- request.handler:close()
- end
-end
-
-local urlencode, urldecode = util_http.urlencode, util_http.urldecode;
-local formencode, formdecode = util_http.formencode, util_http.formdecode;
-
-_M.urlencode, _M.urldecode = urlencode, urldecode;
-_M.formencode, _M.formdecode = formencode, formdecode;
-
-return _M;
+return {
+ request = request;
+
+ -- COMPAT
+ urlencode = util_http.urlencode;
+ urldecode = util_http.urldecode;
+ formencode = util_http.formencode;
+ formdecode = util_http.formdecode;
+};
diff --git a/net/http/codes.lua b/net/http/codes.lua
index 0cadd079..bc31c7dd 100644
--- a/net/http/codes.lua
+++ b/net/http/codes.lua
@@ -25,6 +25,7 @@ local response_codes = {
[305] = "Use Proxy";
-- The 306 status code was used in a previous version of [RFC2616], is no longer used, and the code is reserved.
[307] = "Temporary Redirect";
+ [308] = "Permanent Redirect";
[400] = "Bad Request";
[401] = "Unauthorized";
@@ -39,17 +40,21 @@ local response_codes = {
[410] = "Gone";
[411] = "Length Required";
[412] = "Precondition Failed";
- [413] = "Request Entity Too Large";
- [414] = "Request-URI Too Long";
+ [413] = "Payload Too Large";
+ [414] = "URI Too Long";
[415] = "Unsupported Media Type";
- [416] = "Requested Range Not Satisfiable";
+ [416] = "Range Not Satisfiable";
[417] = "Expectation Failed";
[418] = "I'm a teapot";
+ [421] = "Misdirected Request";
[422] = "Unprocessable Entity";
[423] = "Locked";
[424] = "Failed Dependency";
-- The 425 status code is reserved for the WebDAV advanced collections expired proposal [RFC2817]
[426] = "Upgrade Required";
+ [428] = "Precondition Required";
+ [429] = "Too Many Requests";
+ [431] = "Request Header Fields Too Large";
[500] = "Internal Server Error";
[501] = "Not Implemented";
@@ -61,6 +66,7 @@ local response_codes = {
[507] = "Insufficient Storage";
[508] = "Loop Detected";
[510] = "Not Extended";
+ [511] = "Network Authentication Required";
};
for k,v in pairs(response_codes) do response_codes[k] = k.." "..v; end
diff --git a/net/http/server.lua b/net/http/server.lua
index f091595c..bc39767f 100644
--- a/net/http/server.lua
+++ b/net/http/server.lua
@@ -11,11 +11,14 @@ local setmetatable = setmetatable;
local xpcall = xpcall;
local traceback = debug.traceback;
local tostring = tostring;
+local cache = require "util.cache";
local codes = require "net.http.codes";
+local blocksize = require "socket".BLOCKSIZE or 2048;
local _M = {};
local sessions = {};
+local incomplete = {};
local listener = {};
local hosts = {};
local default_host;
@@ -27,7 +30,10 @@ local function is_wildcard_match(wildcard_event, event)
return wildcard_event:sub(1, -2) == event:sub(1, #wildcard_event-1);
end
-local recent_wildcard_events, max_cached_wildcard_events = {}, 10000;
+local _handlers = events._handlers;
+local recent_wildcard_events = cache.new(10000, function (key, value)
+ rawset(_handlers, key, nil);
+end);
local event_map = events._event_map;
setmetatable(events._handlers, {
@@ -62,10 +68,7 @@ setmetatable(events._handlers, {
end
rawset(handlers, curr_event, handlers_array);
if not event_map[curr_event] then -- Only wildcard handlers match, if any
- table.insert(recent_wildcard_events, curr_event);
- if #recent_wildcard_events > max_cached_wildcard_events then
- rawset(handlers, table.remove(recent_wildcard_events, 1), nil);
- end
+ recent_wildcard_events:set(curr_event, true);
end
return handlers_array;
end;
@@ -139,17 +142,26 @@ function listener.ondisconnect(conn)
open_response.finished = true;
open_response:on_destroy();
end
+ incomplete[conn] = nil;
sessions[conn] = nil;
end
function listener.ondetach(conn)
sessions[conn] = nil;
+ incomplete[conn] = nil;
end
function listener.onincoming(conn, data)
sessions[conn]:feed(data);
end
+function listener.ondrain(conn)
+ local response = incomplete[conn];
+ if response and response._send_more then
+ response._send_more();
+ end
+end
+
local headerfix = setmetatable({}, {
__index = function(t, k)
local v = "\r\n"..k:gsub("_", "-"):gsub("%f[%w].", s_upper)..": ";
@@ -189,6 +201,8 @@ function handle_request(conn, request, finish_cb)
persistent = persistent;
conn = conn;
send = _M.send_response;
+ send_file = _M.send_file;
+ done = _M.finish_response;
finish_cb = finish_cb;
};
conn._http_open_response = response;
@@ -208,7 +222,7 @@ function handle_request(conn, request, finish_cb)
err_code, err = 400, "Missing or invalid 'Host' header";
end
end
-
+
if err then
response.status_code = err_code;
response:send(events.fire_event("http-error", { code = err_code, message = err }));
@@ -250,24 +264,60 @@ function handle_request(conn, request, finish_cb)
response.status_code = 404;
response:send(events.fire_event("http-error", { code = 404 }));
end
-function _M.send_response(response, body)
- if response.finished then return; end
- response.finished = true;
- response.conn._http_open_response = nil;
-
+local function prepare_header(response)
local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]);
local headers = response.headers;
- body = body or response.body or "";
- headers.content_length = #body;
-
local output = { status_line };
for k,v in pairs(headers) do
t_insert(output, headerfix[k]..v);
end
t_insert(output, "\r\n\r\n");
+ return output;
+end
+_M.prepare_header = prepare_header;
+function _M.send_response(response, body)
+ if response.finished then return; end
+ body = body or response.body or "";
+ response.headers.content_length = #body;
+ local output = prepare_header(response);
t_insert(output, body);
-
response.conn:write(t_concat(output));
+ response:done();
+end
+function _M.send_file(response, f)
+ if response.finished then return; end
+ local chunked = not response.headers.content_length;
+ if chunked then response.headers.transfer_encoding = "chunked"; end
+ incomplete[response.conn] = response;
+ response._send_more = function ()
+ if response.finished then
+ incomplete[response.conn] = nil;
+ return;
+ end
+ local chunk = f:read(blocksize);
+ if chunk then
+ if chunked then
+ chunk = ("%x\r\n%s\r\n"):format(#chunk, chunk);
+ end
+ -- io.write("."); io.flush();
+ response.conn:write(chunk);
+ else
+ if chunked then
+ response.conn:write("0\r\n\r\n");
+ end
+ -- io.write("\n");
+ if f.close then f:close(); end
+ incomplete[response.conn] = nil;
+ return response:done();
+ end
+ end
+ response.conn:write(t_concat(prepare_header(response)));
+ return true;
+end
+function _M.finish_response(response)
+ if response.finished then return; end
+ response.finished = true;
+ response.conn._http_open_response = nil;
if response.on_destroy then
response:on_destroy();
response.on_destroy = nil;
@@ -286,7 +336,7 @@ function _M.remove_handler(event, handler)
end
function _M.listen_on(port, interface, ssl)
- addserver(interface or "*", port, listener, "*a", ssl);
+ return addserver(interface or "*", port, listener, "*a", ssl);
end
function _M.add_host(host)
hosts[host] = true;
diff --git a/net/httpserver.lua b/net/httpserver.lua
index 7d574788..6e2e31b9 100644
--- a/net/httpserver.lua
+++ b/net/httpserver.lua
@@ -2,14 +2,15 @@
local log = require "util.logger".init("net.httpserver");
local traceback = debug.traceback;
-module "httpserver"
+local _ENV = nil;
function fail()
log("error", "Attempt to use legacy HTTP API. For more info see http://prosody.im/doc/developers/legacy_http");
log("error", "Legacy HTTP API usage, %s", traceback("", 2));
end
-new, new_from_config = fail, fail;
-set_default_handler = fail;
-
-return _M;
+return {
+ new = fail;
+ new_from_config = fail;
+ set_default_handler = fail;
+};
diff --git a/net/server.lua b/net/server.lua
index 9b0d27e1..41e180fa 100644
--- a/net/server.lua
+++ b/net/server.lua
@@ -1,7 +1,7 @@
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
diff --git a/net/server_event.lua b/net/server_event.lua
index 882d10ed..2cb45553 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -11,6 +11,7 @@
-- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing
--]]
+-- luacheck: ignore 212/self 431/err 211/ret
local SCRIPT_NAME = "server_event.lua"
local SCRIPT_VERSION = "0.05"
@@ -29,30 +30,36 @@ local cfg = {
WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket
CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts
CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
+ READ_RETRY_DELAY = 1e-06, -- if, after reading, there is still data in buffer, wait this long and continue reading
DEBUG = true, -- show debug messages
}
-local function use(x) return rawget(_G, x); end
-local ipairs = use "ipairs"
-local string = use "string"
-local select = use "select"
-local require = use "require"
-local tostring = use "tostring"
-local coroutine = use "coroutine"
-local setmetatable = use "setmetatable"
+local pairs = pairs
+local select = select
+local require = require
+local tostring = tostring
+local setmetatable = setmetatable
local t_insert = table.insert
local t_concat = table.concat
+local s_sub = string.sub
-local ssl = use "ssl"
-local socket = use "socket" or require "socket"
+local coroutine_wrap = coroutine.wrap
+local coroutine_yield = coroutine.yield
+
+local has_luasec, ssl = pcall ( require , "ssl" )
+local socket = require "socket"
+local levent = require "luaevent.core"
+
+local socket_gettime = socket.gettime
+local getaddrinfo = socket.dns.getaddrinfo
local log = require ("util.logger").init("socket")
local function debug(...)
return log("debug", ("%s "):rep(select('#', ...)), ...)
end
-local vdebug = debug;
+-- local vdebug = debug;
local bitor = ( function( ) -- thx Rici Lake
local hasbit = function( x, p )
@@ -72,741 +79,680 @@ local bitor = ( function( ) -- thx Rici Lake
end
end )( )
-local event = require "luaevent.core"
-local base = event.new( )
-local EV_READ = event.EV_READ
-local EV_WRITE = event.EV_WRITE
-local EV_TIMEOUT = event.EV_TIMEOUT
-local EV_SIGNAL = event.EV_SIGNAL
+local base = levent.new( )
+local addevent = base.addevent
+local EV_READ = levent.EV_READ
+local EV_WRITE = levent.EV_WRITE
+local EV_TIMEOUT = levent.EV_TIMEOUT
+local EV_SIGNAL = levent.EV_SIGNAL
local EV_READWRITE = bitor( EV_READ, EV_WRITE )
-local interfacelist = ( function( ) -- holds the interfaces for sockets
- local array = { }
- local len = 0
- return function( method, arg )
- if "add" == method then
- len = len + 1
- array[ len ] = arg
- arg:_position( len )
- return len
- elseif "delete" == method then
- if len <= 0 then
- return nil, "array is already empty"
+local interfacelist = { }
+
+-- Client interface methods
+local interface_mt = {}; interface_mt.__index = interface_mt;
+
+-- Private methods
+function interface_mt:_close()
+ return self:_destroy();
+end
+
+function interface_mt:_start_connection(plainssl) -- called from wrapclient
+ local callback = function( event )
+ if EV_TIMEOUT == event then -- timeout during connection
+ self.fatalerror = "connection timeout"
+ self:ontimeout() -- call timeout listener
+ self:_close()
+ debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
+ else
+ if plainssl and has_luasec then -- start ssl session
+ self:starttls(self._sslctx, true)
+ else -- normal connection
+ self:_start_session(true)
end
- local position = arg:_position() -- get position in array
- if position ~= len then
- local interface = array[ len ] -- get last interface
- array[ position ] = interface -- copy it into free position
- array[ len ] = nil -- free last position
- interface:_position( position ) -- set new position in array
- else -- free last position
- array[ len ] = nil
+ debug( "new connection established. id:", self.id )
+ end
+ self.eventconnect = nil
+ return -1
+ end
+ self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
+ return true
+end
+function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
+ if self.type == "client" then
+ local callback = function( )
+ self:_lock( false, false, false )
+ --vdebug( "start listening on client socket with id:", self.id )
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ if call_onconnect then
+ self:onconnect()
end
- len = len - 1
- return len
- else
- return array
+ self.eventsession = nil
+ return -1
end
+ self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 )
+ else
+ self:_lock( false )
+ --vdebug( "start listening on server socket with id:", self.id )
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback
end
-end )( )
-
--- Client interface methods
-local interface_mt
-do
- interface_mt = {}; interface_mt.__index = interface_mt;
-
- local addevent = base.addevent
- local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
-
- -- Private methods
- function interface_mt:_position(new_position)
- self.position = new_position or self.position
- return self.position;
- end
- function interface_mt:_close()
- return self:_destroy();
- end
-
- function interface_mt:_start_connection(plainssl) -- should be called from addclient
- local callback = function( event )
- if EV_TIMEOUT == event then -- timeout during connection
- self.fatalerror = "connection timeout"
- self:ontimeout() -- call timeout listener
- self:_close()
- debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
- else
- if plainssl and ssl then -- start ssl session
- self:starttls(self._sslctx, true)
- else -- normal connection
- self:_start_session(true)
+ return true
+end
+function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
+ --vdebug( "starting ssl session with client id:", self.id )
+ local _
+ _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
+ _ = self.eventwrite and self.eventwrite:close( )
+ self.eventread, self.eventwrite = nil, nil
+ local err
+ self.conn, err = ssl.wrap( self.conn, self._sslctx )
+ if err then
+ self.fatalerror = err
+ self.conn = nil -- cannot be used anymore
+ if call_onconnect then
+ self.ondisconnect = nil -- dont call this when client isnt really connected
+ end
+ self:_close()
+ debug( "fatal error while ssl wrapping:", err )
+ return false
+ end
+ self.conn:settimeout( 0 ) -- set non blocking
+ local handshakecallback = coroutine_wrap(function( event )
+ local _, err
+ local attempt = 0
+ local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
+ while attempt < maxattempt do -- no endless loop
+ attempt = attempt + 1
+ debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
+ if attempt > maxattempt then
+ self.fatalerror = "max handshake attempts exceeded"
+ elseif EV_TIMEOUT == event then
+ self.fatalerror = "timeout during handshake"
+ else
+ _, err = self.conn:dohandshake( )
+ if not err then
+ self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
+ self.send = self.conn.send -- caching table lookups with new client object
+ self.receive = self.conn.receive
+ if not call_onconnect then -- trigger listener
+ self:onstatus("ssl-handshake-complete");
end
- debug( "new connection established. id:", self.id )
+ self:_start_session( call_onconnect )
+ debug( "ssl handshake done" )
+ self.eventhandshake = nil
+ return -1
end
- self.eventconnect = nil
- return -1
- end
- self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
- return true
- end
- function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
- if self.type == "client" then
- local callback = function( )
- self:_lock( false, false, false )
- --vdebug( "start listening on client socket with id:", self.id )
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
- if call_onconnect then
- self:onconnect()
+ if err == "wantwrite" then
+ event = EV_WRITE
+ elseif err == "wantread" then
+ event = EV_READ
+ else
+ debug( "ssl handshake error:", err )
+ self.fatalerror = err
end
- self.eventsession = nil
- return -1
end
- self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 )
- else
- self:_lock( false )
- --vdebug( "start listening on server socket with id:", self.id )
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback
- end
- return true
- end
- function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
- --vdebug( "starting ssl session with client id:", self.id )
- local _
- _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
- _ = self.eventwrite and self.eventwrite:close( )
- self.eventread, self.eventwrite = nil, nil
- local err
- self.conn, err = ssl.wrap( self.conn, self._sslctx )
- if err then
- self.fatalerror = err
- self.conn = nil -- cannot be used anymore
+ if self.fatalerror then
if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
- debug( "fatal error while ssl wrapping:", err )
- return false
- end
- self.conn:settimeout( 0 ) -- set non blocking
- local handshakecallback = coroutine_wrap(
- function( event )
- local _, err
- local attempt = 0
- local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
- while attempt < maxattempt do -- no endless loop
- attempt = attempt + 1
- debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
- if attempt > maxattempt then
- self.fatalerror = "max handshake attempts exceeded"
- elseif EV_TIMEOUT == event then
- self.fatalerror = "timeout during handshake"
- else
- _, err = self.conn:dohandshake( )
- if not err then
- self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
- self.send = self.conn.send -- caching table lookups with new client object
- self.receive = self.conn.receive
- if not call_onconnect then -- trigger listener
- self:onstatus("ssl-handshake-complete");
- end
- self:_start_session( call_onconnect )
- debug( "ssl handshake done" )
- self.eventhandshake = nil
- return -1
- end
- if err == "wantwrite" then
- event = EV_WRITE
- elseif err == "wantread" then
- event = EV_READ
- else
- debug( "ssl handshake error:", err )
- self.fatalerror = err
- end
- end
- if self.fatalerror then
- if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
- end
- self:_close()
- debug( "handshake failed because:", self.fatalerror )
- self.eventhandshake = nil
- return -1
- end
- event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
- end
- end
- )
- debug "starting handshake..."
- self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
- self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
- return true
- end
- function interface_mt:_destroy() -- close this interface + events and call last listener
- debug( "closing client with id:", self.id, self.fatalerror )
- self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
- local _
- _ = self.eventread and self.eventread:close( )
- if self.type == "client" then
- _ = self.eventwrite and self.eventwrite:close( )
- _ = self.eventhandshake and self.eventhandshake:close( )
- _ = self.eventstarthandshake and self.eventstarthandshake:close( )
- _ = self.eventconnect and self.eventconnect:close( )
- _ = self.eventsession and self.eventsession:close( )
- _ = self.eventwritetimeout and self.eventwritetimeout:close( )
- _ = self.eventreadtimeout and self.eventreadtimeout:close( )
- _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
- _ = self.conn and self.conn:close( ) -- close connection
- _ = self._server and self._server:counter(-1);
- self.eventread, self.eventwrite = nil, nil
- self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
- self.readcallback, self.writecallback = nil, nil
- else
- self.conn:close( )
- self.eventread, self.eventclose = nil, nil
- self.interface, self.readcallback = nil, nil
+ debug( "handshake failed because:", self.fatalerror )
+ self.eventhandshake = nil
+ return -1
end
- interfacelist( "delete", self )
- return true
- end
-
- function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
- self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
- return nointerface, noreading, nowriting
- end
-
- --TODO: Deprecate
- function interface_mt:lock_read(switch)
- if switch then
- return self:pause();
- else
- return self:resume();
+ event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
end
end
+ )
+ debug "starting handshake..."
+ self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
+ self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
+ return true
+end
+function interface_mt:_destroy() -- close this interface + events and call last listener
+ debug( "closing client with id:", self.id, self.fatalerror )
+ self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
+ local _
+ _ = self.eventread and self.eventread:close( )
+ if self.type == "client" then
+ _ = self.eventwrite and self.eventwrite:close( )
+ _ = self.eventhandshake and self.eventhandshake:close( )
+ _ = self.eventstarthandshake and self.eventstarthandshake:close( )
+ _ = self.eventconnect and self.eventconnect:close( )
+ _ = self.eventsession and self.eventsession:close( )
+ _ = self.eventwritetimeout and self.eventwritetimeout:close( )
+ _ = self.eventreadtimeout and self.eventreadtimeout:close( )
+ _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
+ _ = self.conn and self.conn:close( ) -- close connection
+ _ = self._server and self._server:counter(-1);
+ self.eventread, self.eventwrite = nil, nil
+ self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
+ self.readcallback, self.writecallback = nil, nil
+ else
+ self.conn:close( )
+ self.eventread, self.eventclose = nil, nil
+ self.interface, self.readcallback = nil, nil
+ end
+ interfacelist[ self ] = nil
+ return true
+end
- function interface_mt:pause()
- return self:_lock(self.nointerface, true, self.nowriting);
- end
+function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
+ self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
+ return nointerface, noreading, nowriting
+end
- function interface_mt:resume()
- self:_lock(self.nointerface, false, self.nowriting);
- if self.readcallback and not self.eventread then
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
- return true;
- end
+--TODO: Deprecate
+function interface_mt:lock_read(switch)
+ if switch then
+ return self:pause();
+ else
+ return self:resume();
end
+end
- function interface_mt:counter(c)
- if c then
- self._connections = self._connections + c
- end
- return self._connections
- end
-
- -- Public methods
- function interface_mt:write(data)
- if self.nowriting then return nil, "locked" end
- --vdebug( "try to send data to client, id/data:", self.id, data )
- data = tostring( data )
- local len = #data
- local total = len + self.writebufferlen
- if total > cfg.MAX_SEND_LENGTH then -- check buffer length
- local err = "send buffer exceeded"
- debug( "error:", err ) -- to much, check your app
- return nil, err
- end
- t_insert(self.writebuffer, data) -- new buffer
- self.writebufferlen = total
- if not self.eventwrite then -- register new write event
- --vdebug( "register new write event" )
- self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT )
- end
- return true
- end
- function interface_mt:close()
- if self.nointerface then return nil, "locked"; end
- debug( "try to close client connection with id:", self.id )
- if self.type == "client" then
- self.fatalerror = "client to close"
- if self.eventwrite then -- wait for incomplete write request
- self:_lock( true, true, false )
- debug "closing delayed until writebuffer is empty"
- return nil, "writebuffer not empty, waiting"
- else -- close now
- self:_lock( true, true, true )
- self:_close()
- return true
- end
- else
- debug( "try to close server with id:", tostring(self.id))
- self.fatalerror = "server to close"
- self:_lock( true )
- self:_close( 0 )
- return true
- end
- end
-
- function interface_mt:socket()
- return self.conn
- end
-
- function interface_mt:server()
- return self._server or self;
- end
-
- function interface_mt:port()
- return self._port
- end
-
- function interface_mt:serverport()
- return self._serverport
- end
-
- function interface_mt:ip()
- return self._ip
- end
-
- function interface_mt:ssl()
- return self._usingssl
- end
- interface_mt.clientport = interface_mt.port -- COMPAT server_select
+function interface_mt:pause()
+ return self:_lock(self.nointerface, true, self.nowriting);
+end
- function interface_mt:type()
- return self._type or "client"
+function interface_mt:resume()
+ self:_lock(self.nointerface, false, self.nowriting);
+ if self.readcallback and not self.eventread then
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ return true;
end
-
- function interface_mt:connections()
- return self._connections
- end
-
- function interface_mt:address()
- return self.addr
- end
-
- function interface_mt:set_sslctx(sslctx)
- self._sslctx = sslctx;
- if sslctx then
- self.starttls = nil; -- use starttls() of interface_mt
- else
- self.starttls = false; -- prevent starttls()
- end
+end
+
+function interface_mt:counter(c)
+ if c then
+ self._connections = self._connections + c
end
+ return self._connections
+end
- function interface_mt:set_mode(pattern)
- if pattern then
- self._pattern = pattern;
- end
- return self._pattern;
- end
-
- function interface_mt:set_send(new_send)
- -- No-op, we always use the underlying connection's send
- end
-
- function interface_mt:starttls(sslctx, call_onconnect)
- debug( "try to start ssl at client id:", self.id )
- local err
- self._sslctx = sslctx;
- if self._usingssl then -- startssl was already called
- err = "ssl already active"
- end
- if err then
- debug( "error:", err )
- return nil, err
- end
- self._usingssl = true
- self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
- self.startsslcallback = nil
- self:_start_ssl(call_onconnect);
- self.eventstarthandshake = nil
- return -1
- end
- if not self.eventwrite then
- self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake
- self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake
- else -- wait until writebuffer is empty
+-- Public methods
+function interface_mt:write(data)
+ if self.nowriting then return nil, "locked" end
+ --vdebug( "try to send data to client, id/data:", self.id, data )
+ data = tostring( data )
+ local len = #data
+ local total = len + self.writebufferlen
+ if total > cfg.MAX_SEND_LENGTH then -- check buffer length
+ local err = "send buffer exceeded"
+ debug( "error:", err ) -- to much, check your app
+ return nil, err
+ end
+ t_insert(self.writebuffer, data) -- new buffer
+ self.writebufferlen = total
+ if not self.eventwrite then -- register new write event
+ --vdebug( "register new write event" )
+ self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT )
+ end
+ return true
+end
+function interface_mt:close()
+ if self.nointerface then return nil, "locked"; end
+ debug( "try to close client connection with id:", self.id )
+ if self.type == "client" then
+ self.fatalerror = "client to close"
+ if self.eventwrite then -- wait for incomplete write request
self:_lock( true, true, false )
- debug "ssl session delayed until writebuffer is empty..."
+ debug "closing delayed until writebuffer is empty"
+ return nil, "writebuffer not empty, waiting"
+ else -- close now
+ self:_lock( true, true, true )
+ self:_close()
+ return true
end
- self.starttls = false;
+ else
+ debug( "try to close server with id:", tostring(self.id))
+ self.fatalerror = "server to close"
+ self:_lock( true )
+ self:_close( 0 )
return true
end
-
- function interface_mt:setoption(option, value)
- if self.conn.setoption then
- return self.conn:setoption(option, value);
- end
- return false, "setoption not implemented";
- end
-
- function interface_mt:setlistener(listener)
- self:ondetach(); -- Notify listener that it is no longer responsible for this connection
- self.onconnect, self.ondisconnect, self.onincoming,
- self.ontimeout, self.onstatus, self.ondetach
- = listener.onconnect, listener.ondisconnect, listener.onincoming,
- listener.ontimeout, listener.onstatus, listener.ondetach;
- end
-
- -- Stub handlers
- function interface_mt:onconnect()
- end
- function interface_mt:onincoming()
- end
- function interface_mt:ondisconnect()
- end
- function interface_mt:ontimeout()
- end
- function interface_mt:ondrain()
+end
+
+function interface_mt:socket()
+ return self.conn
+end
+
+function interface_mt:server()
+ return self._server or self;
+end
+
+function interface_mt:port()
+ return self._port
+end
+
+function interface_mt:serverport()
+ return self._serverport
+end
+
+function interface_mt:ip()
+ return self._ip
+end
+
+function interface_mt:ssl()
+ return self._usingssl
+end
+interface_mt.clientport = interface_mt.port -- COMPAT server_select
+
+function interface_mt:type()
+ return self._type or "client"
+end
+
+function interface_mt:connections()
+ return self._connections
+end
+
+function interface_mt:address()
+ return self.addr
+end
+
+function interface_mt:set_sslctx(sslctx)
+ self._sslctx = sslctx;
+ if sslctx then
+ self.starttls = nil; -- use starttls() of interface_mt
+ else
+ self.starttls = false; -- prevent starttls()
end
- function interface_mt:ondetach()
+end
+
+function interface_mt:set_mode(pattern)
+ if pattern then
+ self._pattern = pattern;
end
- function interface_mt:onstatus()
+ return self._pattern;
+end
+
+function interface_mt:set_send(new_send) -- luacheck: ignore 212
+ -- No-op, we always use the underlying connection's send
+end
+
+function interface_mt:starttls(sslctx, call_onconnect)
+ debug( "try to start ssl at client id:", self.id )
+ local err
+ self._sslctx = sslctx;
+ if self._usingssl then -- startssl was already called
+ err = "ssl already active"
+ end
+ if err then
+ debug( "error:", err )
+ return nil, err
+ end
+ self._usingssl = true
+ self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
+ self.startsslcallback = nil
+ self:_start_ssl(call_onconnect);
+ self.eventstarthandshake = nil
+ return -1
+ end
+ if not self.eventwrite then
+ self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake
+ self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake
+ else
+ -- wait until writebuffer is empty
+ self:_lock( true, true, false )
+ debug "ssl session delayed until writebuffer is empty..."
+ end
+ self.starttls = false;
+ return true
+end
+
+function interface_mt:setoption(option, value)
+ if self.conn.setoption then
+ return self.conn:setoption(option, value);
end
+ return false, "setoption not implemented";
+end
+
+function interface_mt:setlistener(listener)
+ self:ondetach(); -- Notify listener that it is no longer responsible for this connection
+ self.onconnect = listener.onconnect;
+ self.ondisconnect = listener.ondisconnect;
+ self.onincoming = listener.onincoming;
+ self.ontimeout = listener.ontimeout;
+ self.onreadtimeout = listener.onreadtimeout;
+ self.onstatus = listener.onstatus;
+ self.ondetach = listener.ondetach;
+end
+
+-- Stub handlers
+function interface_mt:onconnect()
+end
+function interface_mt:onincoming()
+end
+function interface_mt:ondisconnect()
+end
+function interface_mt:ontimeout()
+end
+function interface_mt:onreadtimeout()
+ self.fatalerror = "timeout during receiving"
+ debug( "connection failed:", self.fatalerror )
+ self:_close()
+ self.eventread = nil
+end
+function interface_mt:ondrain()
+end
+function interface_mt:ondetach()
+end
+function interface_mt:onstatus()
end
-- End of client interface methods
-local handleclient;
-do
- local string_sub = string.sub -- caching table lookups
- local addevent = base.addevent
- local socket_gettime = socket.gettime
- function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface
- --vdebug("creating client interfacce...")
- local interface = {
- type = "client";
- conn = client;
- currenttime = socket_gettime( ); -- safe the origin
- writebuffer = {}; -- writebuffer
- writebufferlen = 0; -- length of writebuffer
- send = client.send; -- caching table lookups
- receive = client.receive;
- onconnect = listener.onconnect; -- will be called when client disconnects
- ondisconnect = listener.ondisconnect; -- will be called when client disconnects
- onincoming = listener.onincoming; -- will be called when client sends data
- ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
- ondrain = listener.ondrain; -- called when writebuffer is empty
- ondetach = listener.ondetach; -- called when disassociating this listener from this connection
- onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
- eventread = false, eventwrite = false, eventclose = false,
- eventhandshake = false, eventstarthandshake = false; -- event handler
- eventconnect = false, eventsession = false; -- more event handler...
- eventwritetimeout = false; -- even more event handler...
- eventreadtimeout = false;
- fatalerror = false; -- error message
- writecallback = false; -- will be called on write events
- readcallback = false; -- will be called on read events
- nointerface = true; -- lock/unlock parameter of this interface
- noreading = false, nowriting = false; -- locks of the read/writecallback
- startsslcallback = false; -- starting handshake callback
- position = false; -- position of client in interfacelist
-
- -- Properties
- _ip = ip, _port = port, _server = server, _pattern = pattern,
- _serverport = (server and server:port() or nil),
- _sslctx = sslctx; -- parameters
- _usingssl = false; -- client is using ssl;
- }
- if not ssl then interface.starttls = false; end
- interface.id = tostring(interface):match("%x+$");
- interface.writecallback = function( event ) -- called on write events
- --vdebug( "new client write event, id/ip/port:", interface, ip, port )
- if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event
- --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror )
- interface.eventwrite = false
- return -1
- end
- if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect
- interface.fatalerror = "timeout during writing"
- debug( "writing failed:", interface.fatalerror )
- interface:_close()
- interface.eventwrite = false
- return -1
- else -- can write :)
- if interface._usingssl then -- handle luasec
- if interface.eventreadtimeout then -- we have to read first
- local ret = interface.readcallback( ) -- call readcallback
- --vdebug( "tried to read in writecallback, result:", ret )
- end
- if interface.eventwritetimeout then -- luasec only
- interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error
- interface.eventwritetimeout = false
- end
+local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface
+ --vdebug("creating client interfacce...")
+ local interface = {
+ type = "client";
+ conn = client;
+ currenttime = socket_gettime( ); -- safe the origin
+ writebuffer = {}; -- writebuffer
+ writebufferlen = 0; -- length of writebuffer
+ send = client.send; -- caching table lookups
+ receive = client.receive;
+ onconnect = listener.onconnect; -- will be called when client disconnects
+ ondisconnect = listener.ondisconnect; -- will be called when client disconnects
+ onincoming = listener.onincoming; -- will be called when client sends data
+ ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
+ ondrain = listener.ondrain; -- called when writebuffer is empty
+ ondetach = listener.ondetach; -- called when disassociating this listener from this connection
+ onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
+ eventread = false, eventwrite = false, eventclose = false,
+ eventhandshake = false, eventstarthandshake = false; -- event handler
+ eventconnect = false, eventsession = false; -- more event handler...
+ eventwritetimeout = false; -- even more event handler...
+ eventreadtimeout = false;
+ fatalerror = false; -- error message
+ writecallback = false; -- will be called on write events
+ readcallback = false; -- will be called on read events
+ nointerface = true; -- lock/unlock parameter of this interface
+ noreading = false, nowriting = false; -- locks of the read/writecallback
+ startsslcallback = false; -- starting handshake callback
+ position = false; -- position of client in interfacelist
+
+ -- Properties
+ _ip = ip, _port = port, _server = server, _pattern = pattern,
+ _serverport = (server and server:port() or nil),
+ _sslctx = sslctx; -- parameters
+ _usingssl = false; -- client is using ssl;
+ }
+ if not has_luasec then interface.starttls = false; end
+ interface.id = tostring(interface):match("%x+$");
+ interface.writecallback = function( event ) -- called on write events
+ --vdebug( "new client write event, id/ip/port:", interface, ip, port )
+ if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event
+ --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror )
+ interface.eventwrite = false
+ return -1
+ end
+ if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect
+ interface.fatalerror = "timeout during writing"
+ debug( "writing failed:", interface.fatalerror )
+ interface:_close()
+ interface.eventwrite = false
+ return -1
+ else -- can write :)
+ if interface._usingssl then -- handle luasec
+ if interface.eventreadtimeout then -- we have to read first
+ local ret = interface.readcallback( ) -- call readcallback
+ --vdebug( "tried to read in writecallback, result:", ret )
end
- interface.writebuffer = { t_concat(interface.writebuffer) }
- local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
- --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
- if succ then -- writing succesful
- interface.writebuffer[1] = nil
- interface.writebufferlen = 0
- interface:ondrain();
- if interface.fatalerror then
- debug "closing client after writing"
- interface:_close() -- close interface if needed
- elseif interface.startsslcallback then -- start ssl connection if needed
- debug "starting ssl handshake after writing"
- interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
- elseif interface.writebufferlen ~= 0 then
- -- data possibly written from ondrain
- return EV_WRITE, cfg.WRITE_TIMEOUT
- elseif interface.eventreadtimeout then
- return EV_WRITE, cfg.WRITE_TIMEOUT
- end
- interface.eventwrite = nil
- return -1
- elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
- --vdebug( "writebuffer is not empty:", err )
- interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer
- interface.writebufferlen = interface.writebufferlen - byte
- if "wantread" == err then -- happens only with luasec
- local callback = function( )
- interface:_close()
- interface.eventwritetimeout = nil
- return -1;
- end
- interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
- debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
- -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
- return -1
- end
- return EV_WRITE, cfg.WRITE_TIMEOUT
- else -- connection was closed during writing or fatal error
- interface.fatalerror = err or "fatal error"
- debug( "connection failed in write event:", interface.fatalerror )
- interface:_close()
- interface.eventwrite = nil
- return -1
+ if interface.eventwritetimeout then -- luasec only
+ interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error
+ interface.eventwritetimeout = false
end
end
- end
-
- interface.readcallback = function( event ) -- called on read events
- --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
- if interface.noreading or interface.fatalerror then -- leave this event
- --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) )
- interface.eventread = nil
- return -1
- end
- if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect
- interface.fatalerror = "timeout during receiving"
- debug( "connection failed:", interface.fatalerror )
- interface:_close()
- interface.eventread = nil
- return -1
- else -- can read
- if interface._usingssl then -- handle luasec
- if interface.eventwritetimeout then -- ok, in the past writecallback was regged
- local ret = interface.writecallback( ) -- call it
- --vdebug( "tried to write in readcallback, result:", tostring(ret) )
- end
- if interface.eventreadtimeout then
- interface.eventreadtimeout:close( )
- interface.eventreadtimeout = nil
- end
- end
- local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
- --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
- buffer = buffer or part
- if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
- interface.fatalerror = "receive buffer exceeded"
- debug( "fatal error:", interface.fatalerror )
- interface:_close()
- interface.eventread = nil
- return -1
+ interface.writebuffer = { t_concat(interface.writebuffer) }
+ local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
+ --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
+ if succ then -- writing succesful
+ interface.writebuffer[1] = nil
+ interface.writebufferlen = 0
+ interface:ondrain();
+ if interface.fatalerror then
+ debug "closing client after writing"
+ interface:_close() -- close interface if needed
+ elseif interface.startsslcallback then -- start ssl connection if needed
+ debug "starting ssl handshake after writing"
+ interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
+ elseif interface.writebufferlen ~= 0 then
+ -- data possibly written from ondrain
+ return EV_WRITE, cfg.WRITE_TIMEOUT
+ elseif interface.eventreadtimeout then
+ return EV_WRITE, cfg.WRITE_TIMEOUT
end
- if err and ( err ~= "timeout" and err ~= "wantread" ) then
- if "wantwrite" == err then -- need to read on write event
- if not interface.eventwrite then -- register new write event if needed
- interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
- end
- interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
- function( )
- interface:_close()
- end, cfg.READ_TIMEOUT
- )
- debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
- -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
- else -- connection was closed or fatal error
- interface.fatalerror = err
- debug( "connection failed in read event:", interface.fatalerror )
+ interface.eventwrite = nil
+ return -1
+ elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
+ --vdebug( "writebuffer is not empty:", err )
+ interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer
+ interface.writebufferlen = interface.writebufferlen - byte
+ if "wantread" == err then -- happens only with luasec
+ local callback = function( )
interface:_close()
- interface.eventread = nil
- return -1
+ interface.eventwritetimeout = nil
+ return -1;
end
- else
- interface.onincoming( interface, buffer, err ) -- send new data to listener
- end
- if interface.noreading then
- interface.eventread = nil;
- return -1;
+ interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
+ debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
+ -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
+ return -1
end
- return EV_READ, cfg.READ_TIMEOUT
+ return EV_WRITE, cfg.WRITE_TIMEOUT
+ else -- connection was closed during writing or fatal error
+ interface.fatalerror = err or "fatal error"
+ debug( "connection failed in write event:", interface.fatalerror )
+ interface:_close()
+ interface.eventwrite = nil
+ return -1
end
end
+ end
- client:settimeout( 0 ) -- set non blocking
- setmetatable(interface, interface_mt)
- interfacelist( "add", interface ) -- add to interfacelist
- return interface
- end
-end
-
-local handleserver
-do
- function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface
- debug "creating server interface..."
- local interface = {
- _connections = 0;
-
- conn = server;
- onconnect = listener.onconnect; -- will be called when new client connected
- eventread = false; -- read event handler
- eventclose = false; -- close event handler
- readcallback = false; -- read event callback
- fatalerror = false; -- error message
- nointerface = true; -- lock/unlock parameter
-
- _ip = addr, _port = port, _pattern = pattern,
- _sslctx = sslctx;
- }
- interface.id = tostring(interface):match("%x+$");
- interface.readcallback = function( event ) -- server handler, called on incoming connections
- --vdebug( "server can accept, id/addr/port:", interface, addr, port )
- if interface.fatalerror then
- --vdebug( "leaving this event because:", self.fatalerror )
- interface.eventread = nil
- return -1
+ interface.readcallback = function( event ) -- called on read events
+ --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
+ if interface.noreading or interface.fatalerror then -- leave this event
+ --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) )
+ interface.eventread = nil
+ return -1
+ end
+ if EV_TIMEOUT == event and not interface.conn:dirty() and interface:onreadtimeout() ~= true then
+ return -1 -- took too long to get some data from client -> disconnect
+ end
+ if interface._usingssl then -- handle luasec
+ if interface.eventwritetimeout then -- ok, in the past writecallback was regged
+ local ret = interface.writecallback( ) -- call it
+ --vdebug( "tried to write in readcallback, result:", tostring(ret) )
end
- local delay = cfg.ACCEPT_DELAY
- if EV_TIMEOUT == event then
- if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count
- debug( "to many connections, seconds to wait for next accept:", delay )
- return EV_TIMEOUT, delay -- timeout...
- else
- return EV_READ -- accept again
- end
+ if interface.eventreadtimeout then
+ interface.eventreadtimeout:close( )
+ interface.eventreadtimeout = nil
end
- --vdebug("max connection check ok, accepting...")
- local client, err = server:accept() -- try to accept; TODO: check err
- while client do
- if interface._connections >= cfg.MAX_CONNECTIONS then
- client:close( ) -- refuse connection
- debug( "maximal connections reached, refuse client connection; accept delay:", delay )
- return EV_TIMEOUT, delay -- delay for next accept attempt
- end
- local client_ip, client_port = client:getpeername( )
- interface._connections = interface._connections + 1 -- increase connection count
- local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
- --vdebug( "client id:", clientinterface, "startssl:", startssl )
- if ssl and sslctx then
- clientinterface:starttls(sslctx, true)
- else
- clientinterface:_start_session( true )
+ end
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
+ --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
+ buffer = buffer or part
+ if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
+ interface.fatalerror = "receive buffer exceeded"
+ debug( "fatal error:", interface.fatalerror )
+ interface:_close()
+ interface.eventread = nil
+ return -1
+ end
+ if err and ( err ~= "timeout" and err ~= "wantread" ) then
+ if "wantwrite" == err then -- need to read on write event
+ if not interface.eventwrite then -- register new write event if needed
+ interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
end
- debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
-
- client, err = server:accept() -- try to accept again
+ interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
+ function( ) interface:_close() end, cfg.READ_TIMEOUT)
+ debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
+ -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
+ else -- connection was closed or fatal error
+ interface.fatalerror = err
+ debug( "connection failed in read event:", interface.fatalerror )
+ interface:_close()
+ interface.eventread = nil
+ return -1
end
- return EV_READ
+ else
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
end
-
- server:settimeout( 0 )
- setmetatable(interface, interface_mt)
- interfacelist( "add", interface )
- interface:_start_session()
- return interface
- end
-end
-
-local addserver = ( function( )
- return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments
- --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil")
- local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
- if not server then
- debug( "creating server socket on "..addr.." port "..port.." failed:", err )
- return nil, err
+ if interface.noreading then
+ interface.eventread = nil;
+ return -1;
end
- local sslctx
- if sslcfg then
- if not ssl then
- debug "fatal error: luasec not found"
- return nil, "luasec not found"
- end
- sslctx, err = sslcfg
- if err then
- debug( "error while creating new ssl context for server socket:", err )
- return nil, err
- end
+ if interface.conn:dirty() then -- still data left in buffer
+ return EV_TIMEOUT, cfg.READ_RETRY_DELAY;
end
- local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
- debug( "new server created with id:", tostring(interface))
- return interface
+ return EV_READ, cfg.READ_TIMEOUT
end
-end )( )
-local addclient, wrapclient
-do
- function wrapclient( client, ip, port, listeners, pattern, sslctx )
- local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
- interface:_start_connection(sslctx)
- return interface, client
- --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
- end
-
- function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
- local client, err = socket.tcp() -- creating new socket
- if not client then
- debug( "cannot create socket:", err )
- return nil, err
+ client:settimeout( 0 ) -- set non blocking
+ setmetatable(interface, interface_mt)
+ interfacelist[ interface ] = true -- add to interfacelist
+ return interface
+end
+
+local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface
+ debug "creating server interface..."
+ local interface = {
+ _connections = 0;
+
+ type = "server";
+ conn = server;
+ onconnect = listener.onconnect; -- will be called when new client connected
+ eventread = false; -- read event handler
+ eventclose = false; -- close event handler
+ readcallback = false; -- read event callback
+ fatalerror = false; -- error message
+ nointerface = true; -- lock/unlock parameter
+
+ _ip = addr, _port = port, _pattern = pattern,
+ _sslctx = sslctx;
+ }
+ interface.id = tostring(interface):match("%x+$");
+ interface.readcallback = function( event ) -- server handler, called on incoming connections
+ --vdebug( "server can accept, id/addr/port:", interface, addr, port )
+ if interface.fatalerror then
+ --vdebug( "leaving this event because:", self.fatalerror )
+ interface.eventread = nil
+ return -1
end
- client:settimeout( 0 ) -- set nonblocking
- if localaddr then
- local res, err = client:bind( localaddr, localport, -1 )
- if not res then
- debug( "cannot bind client:", err )
- return nil, err
+ local delay = cfg.ACCEPT_DELAY
+ if EV_TIMEOUT == event then
+ if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count
+ debug( "to many connections, seconds to wait for next accept:", delay )
+ return EV_TIMEOUT, delay -- timeout...
+ else
+ return EV_READ -- accept again
end
end
- local sslctx
- if sslcfg then -- handle ssl/new context
- if not ssl then
- debug "need luasec, but not available"
- return nil, "luasec not found"
+ --vdebug("max connection check ok, accepting...")
+ local client, err = server:accept() -- try to accept; TODO: check err
+ while client do
+ if interface._connections >= cfg.MAX_CONNECTIONS then
+ client:close( ) -- refuse connection
+ debug( "maximal connections reached, refuse client connection; accept delay:", delay )
+ return EV_TIMEOUT, delay -- delay for next accept attempt
end
- sslctx, err = sslcfg
- if err then
- debug( "cannot create new ssl context:", err )
- return nil, err
+ local client_ip, client_port = client:getpeername( )
+ interface._connections = interface._connections + 1 -- increase connection count
+ local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
+ --vdebug( "client id:", clientinterface, "startssl:", startssl )
+ if has_luasec and sslctx then
+ clientinterface:starttls(sslctx, true)
+ else
+ clientinterface:_start_session( true )
end
+ debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
+
+ client, err = server:accept() -- try to accept again
end
- local res, err = client:connect( addr, serverport ) -- connect
- if res or ( err == "timeout" ) then
- local ip, port = client:getsockname( )
- local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
- interface:_start_connection( startssl )
- debug( "new connection id:", interface.id )
- return interface, err
+ return EV_READ
+ end
+
+ server:settimeout( 0 )
+ setmetatable(interface, interface_mt)
+ interfacelist[ interface ] = true
+ interface:_start_session()
+ return interface
+end
+
+local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments
+ --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil")
+ if sslctx and not has_luasec then
+ debug "fatal error: luasec not found"
+ return nil, "luasec not found"
+ end
+ local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
+ if not server then
+ debug( "creating server socket on "..addr.." port "..port.." failed:", err )
+ return nil, err
+ end
+ local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
+ debug( "new server created with id:", tostring(interface))
+ return interface
+end
+
+local function wrapclient( client, ip, port, listeners, pattern, sslctx )
+ local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
+ interface:_start_connection(sslctx)
+ return interface, client
+ --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
+end
+
+local function addclient( addr, serverport, listener, pattern, sslctx, typ )
+ if sslctx and not has_luasec then
+ debug "need luasec, but not available"
+ return nil, "luasec not found"
+ end
+ if not typ then
+ local addrinfo, err = getaddrinfo(addr)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
else
- debug( "new connection failed:", err )
- return nil, err
+ typ = "tcp"
end
end
+ local create = socket[typ]
+ if type( create ) ~= "function" then
+ return nil, "invalid socket type"
+ end
+ local client, err = create() -- creating new socket
+ if not client then
+ debug( "cannot create socket:", err )
+ return nil, err
+ end
+ client:settimeout( 0 ) -- set nonblocking
+ local res, err = client:connect( addr, serverport ) -- connect
+ if res or ( err == "timeout" ) then
+ local ip, port = client:getsockname( )
+ local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx )
+ debug( "new connection id:", interface.id )
+ return interface, err
+ else
+ debug( "new connection failed:", err )
+ return nil, err
+ end
end
-
-local loop = function( ) -- starts the event loop
+local function loop( ) -- starts the event loop
base:loop( )
return "quitting";
end
-local newevent = ( function( )
- local add = base.addevent
- return function( ... )
- return add( base, ... )
- end
-end )( )
+local function newevent( ... )
+ return addevent( base, ... )
+end
-local closeallservers = function( arg )
- for _, item in ipairs( interfacelist( ) ) do
+local function closeallservers ( arg )
+ for item in pairs( interfacelist ) do
if item.type == "server" then
item:close( arg )
end
@@ -815,9 +761,9 @@ end
local function setquitting(yes)
if yes then
- -- Quit now
- closeallservers();
- base:loopexit();
+ -- Quit now
+ closeallservers();
+ base:loopexit();
end
end
@@ -829,7 +775,7 @@ end
-- being garbage-collected
local signal_events = {}; -- [signal_num] -> event object
local function hook_signal(signal_num, handler)
- local function _handler(event)
+ local function _handler()
local ret = handler();
if ret ~= false then -- Continue handling this signal?
return EV_SIGNAL; -- Yes
@@ -842,14 +788,14 @@ end
local function link(sender, receiver, buffersize)
local sender_locked;
-
+
function receiver:ondrain()
if sender_locked then
sender:resume();
sender_locked = nil;
end
end
-
+
function sender:onincoming(data)
receiver:write(data);
if receiver.writebufferlen >= buffersize then
@@ -861,12 +807,11 @@ local function link(sender, receiver, buffersize)
end
return {
-
cfg = cfg,
base = base,
loop = loop,
link = link,
- event = event,
+ event = levent,
event_base = base,
addevent = newevent,
addserver = addserver,
diff --git a/net/server_select.lua b/net/server_select.lua
index c50a6ce1..85730e73 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -1,7 +1,7 @@
---
+--
-- server.lua by blastbeat of the luadch project
-- Re-used here under the MIT/X Consortium License
---
+--
-- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain
--
@@ -38,7 +38,6 @@ local coroutine = use "coroutine"
--// lua lib methods //--
-local os_difftime = os.difftime
local math_min = math.min
local math_huge = math.huge
local table_concat = table.concat
@@ -48,13 +47,14 @@ local coroutine_yield = coroutine.yield
--// extern libs //--
-local luasec = use "ssl"
+local has_luasec, luasec = pcall ( require , "ssl" )
local luasocket = use "socket" or require "socket"
local luasocket_gettime = luasocket.gettime
+local getaddrinfo = luasocket.dns.getaddrinfo
--// extern lib methods //--
-local ssl_wrap = ( luasec and luasec.wrap )
+local ssl_wrap = ( has_luasec and luasec.wrap )
local socket_bind = luasocket.bind
local socket_sleep = luasocket.sleep
local socket_select = luasocket.select
@@ -149,7 +149,7 @@ _accepretry = 10 -- seconds to wait until the next attempt of a full server to a
_maxsendlen = 51000 * 1024 -- max len of send buffer
_maxreadlen = 25000 * 1024 -- max len of read buffer
-_checkinterval = 1200000 -- interval in secs to check idle clients
+_checkinterval = 30 -- interval in secs to check idle clients
_sendtimeout = 60000 -- allowed send idle time in secs
_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs
@@ -295,6 +295,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
local status = listeners.onstatus
local disconnect = listeners.ondisconnect
local drain = listeners.ondrain
+ local onreadtimeout = listeners.onreadtimeout;
local detach = listeners.ondetach
local bufferqueue = { } -- buffer array
@@ -324,6 +325,8 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
handler.disconnect = function( )
return disconnect
end
+ handler.onreadtimeout = onreadtimeout;
+
handler.setlistener = function( self, listeners )
if detach then
detach(self) -- Notify listener that it is no longer responsible for this connection
@@ -332,6 +335,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
disconnect = listeners.ondisconnect
status = listeners.onstatus
drain = listeners.ondrain
+ handler.onreadtimeout = listeners.onreadtimeout
detach = listeners.ondetach
end
handler.getstats = function( )
@@ -404,6 +408,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
out_put "server.lua: closed client handler and removed socket from list"
return true
end
+ handler.server = function ( )
+ return server
+ end
handler.ip = function( )
return ip
end
@@ -575,6 +582,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
_ = status and status( handler, "ssl-handshake-complete" )
if self.autostart_ssl and listeners.onconnect then
listeners.onconnect(self);
+ if bufferqueuelen ~= 0 then
+ _sendlistlen = addsocket(_sendlist, client, _sendlistlen)
+ end
end
_readlistlen = addsocket(_readlist, client, _readlistlen)
return true
@@ -592,13 +602,14 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
coroutine_yield( ) -- handshake not finished
end
end
- out_put( "server.lua: ssl handshake error: ", tostring(err or "handshake too long") )
- _ = handler and handler:force_close("ssl handshake failed")
+ err = "ssl handshake error: " .. ( err or "handshake too long" );
+ out_put( "server.lua: ", err );
+ _ = handler and handler:force_close(err)
return false, err -- handshake failed
end
)
end
- if luasec then
+ if has_luasec then
handler.starttls = function( self, _sslctx)
if _sslctx then
handler:set_sslctx(_sslctx);
@@ -624,7 +635,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
shutdown = id
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
-
+
-- remove traces of the old socket
_readlistlen = removesocket( _readlist, oldsocket, _readlistlen )
_sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen )
@@ -651,7 +662,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
- if sslctx and luasec then
+ if sslctx and has_luasec then
out_put "server.lua: auto-starting ssl negotiation..."
handler.autostart_ssl = true;
local ok, err = handler:starttls(sslctx);
@@ -712,7 +723,7 @@ local function link(sender, receiver, buffersize)
sender_locked = nil;
end
end
-
+
local _readbuffer = sender.readbuffer;
function sender.readbuffer()
_readbuffer();
@@ -727,22 +738,23 @@ end
----------------------------------// PUBLIC //--
addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server
+ addr = addr or "*"
local err
if type( listeners ) ~= "table" then
err = "invalid listener table"
- end
- if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ elseif type ( addr ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
err = "invalid port"
elseif _server[ addr..":"..port ] then
err = "listeners on '[" .. addr .. "]:" .. port .. "' already exist"
- elseif sslctx and not luasec then
+ elseif sslctx and not has_luasec then
err = "luasec not found"
end
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
return nil, err
end
- addr = addr or "*"
local server, err = socket_bind( addr, port, _tcpbacklog )
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
@@ -852,7 +864,7 @@ loop = function(once) -- this is the main loop of the program
local next_timer_time = math_huge;
repeat
local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) )
- for i, socket in ipairs( write ) do -- send data waiting in writequeues
+ for _, socket in ipairs( write ) do -- send data waiting in writequeues
local handler = _socketlist[ socket ]
if handler then
handler.sendbuffer( )
@@ -861,7 +873,7 @@ loop = function(once) -- this is the main loop of the program
out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen
end
end
- for i, socket in ipairs( read ) do -- receive data
+ for _, socket in ipairs( read ) do -- receive data
local handler = _socketlist[ socket ]
if handler then
handler.readbuffer( )
@@ -878,21 +890,22 @@ loop = function(once) -- this is the main loop of the program
_currenttime = luasocket_gettime( )
-- Check for socket timeouts
- local difftime = os_difftime( _currenttime - _starttime )
- if difftime > _checkinterval then
+ if _currenttime - _starttime > _checkinterval then
_starttime = _currenttime
for handler, timestamp in pairs( _writetimes ) do
- if os_difftime( _currenttime - timestamp ) > _sendtimeout then
- --_writetimes[ handler ] = nil
+ if _currenttime - timestamp > _sendtimeout then
handler.disconnect( )( handler, "send timeout" )
handler:force_close() -- forced disconnect
end
end
for handler, timestamp in pairs( _readtimes ) do
- if os_difftime( _currenttime - timestamp ) > _readtimeout then
- --_readtimes[ handler ] = nil
- handler.disconnect( )( handler, "read timeout" )
- handler:close( ) -- forced disconnect?
+ if _currenttime - timestamp > _readtimeout then
+ if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then
+ handler.disconnect( )( handler, "read timeout" )
+ handler:close( ) -- forced disconnect?
+ else
+ _readtimes[ handler ] = _currenttime -- reset timer
+ end
end
end
end
@@ -920,6 +933,7 @@ loop = function(once) -- this is the main loop of the program
socket_sleep( _sleeptime )
until quitting;
if once and quitting == "once" then quitting = nil; return; end
+ closeall();
return "quitting"
end
@@ -952,17 +966,46 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx
return handler, socket
end
-local addclient = function( address, port, listeners, pattern, sslctx )
- local client, err = luasocket.tcp( )
+local addclient = function( address, port, listeners, pattern, sslctx, typ )
+ local err
+ if type( listeners ) ~= "table" then
+ err = "invalid listener table"
+ elseif type ( address ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ err = "invalid port"
+ elseif sslctx and not has_luasec then
+ err = "luasec not found"
+ end
+ if not typ then
+ local addrinfo, err = getaddrinfo(address)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
+ else
+ typ = "tcp"
+ end
+ end
+ local create = luasocket[typ]
+ if type( create ) ~= "function" then
+ err = "invalid socket type"
+ end
+
+ if err then
+ out_error( "server.lua, addclient: ", err )
+ return nil, err
+ end
+
+ local client, err = create( )
if err then
return nil, err
end
client:settimeout( 0 )
- _, err = client:connect( address, port )
- if err then -- try again
- local handler = wrapclient( client, address, port, listeners )
+ local ok, err = client:connect( address, port )
+ if ok or err == "timeout" then
+ return wrapclient( client, address, port, listeners, pattern, sslctx )
else
- wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
+ return nil, err
end
end
@@ -992,7 +1035,7 @@ return {
addclient = addclient,
wrapclient = wrapclient,
-
+
loop = loop,
link = link,
step = step,
diff --git a/net/websocket.lua b/net/websocket.lua
new file mode 100644
index 00000000..a4274eec
--- /dev/null
+++ b/net/websocket.lua
@@ -0,0 +1,272 @@
+-- Prosody IM
+-- Copyright (C) 2012 Florian Zeitz
+-- Copyright (C) 2014 Daurnimator
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+
+local t_concat = table.concat;
+
+local http = require "net.http";
+local frames = require "net.websocket.frames";
+local base64 = require "util.encodings".base64;
+local sha1 = require "util.hashes".sha1;
+local random_bytes = require "util.random".bytes;
+local timer = require "util.timer";
+local log = require "util.logger".init "websocket";
+
+local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection.
+
+local websockets = {};
+
+local websocket_listeners = {};
+function websocket_listeners.ondisconnect(handler, err)
+ local s = websockets[handler];
+ websockets[handler] = nil;
+ if s.close_timer then
+ timer.stop(s.close_timer);
+ s.close_timer = nil;
+ end
+ s.readyState = 3;
+ if s.close_code == nil and s.onerror then s:onerror(err); end
+ if s.onclose then s:onclose(s.close_code, s.close_message or err); end
+end
+
+function websocket_listeners.ondetach(handler)
+ websockets[handler] = nil;
+end
+
+local function fail(s, code, reason)
+ module:log("warn", "WebSocket connection failed, closing. %d %s", code, reason);
+ s:close(code, reason);
+ s.handler:close();
+ return false
+end
+
+function websocket_listeners.onincoming(handler, buffer, err)
+ local s = websockets[handler];
+ s.readbuffer = s.readbuffer..buffer;
+ while true do
+ local frame, len = frames.parse(s.readbuffer);
+ if frame == nil then break end
+ s.readbuffer = s.readbuffer:sub(len+1);
+
+ log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
+
+ -- Error cases
+ if frame.RSV1 or frame.RSV2 or frame.RSV3 then -- Reserved bits non zero
+ return fail(s, 1002, "Reserved bits not zero");
+ end
+
+ if frame.opcode < 0x8 then
+ local databuffer = s.databuffer;
+ if frame.opcode == 0x0 then -- Continuation frames
+ if not databuffer then
+ return fail(s, 1002, "Unexpected continuation frame");
+ end
+ databuffer[#databuffer+1] = frame.data;
+ elseif frame.opcode == 0x1 or frame.opcode == 0x2 then -- Text or Binary frame
+ if databuffer then
+ return fail(s, 1002, "Continuation frame expected");
+ end
+ databuffer = {type=frame.opcode, frame.data};
+ s.databuffer = databuffer;
+ else
+ return fail(s, 1002, "Reserved opcode");
+ end
+ if frame.FIN then
+ s.databuffer = nil;
+ if s.onmessage then
+ s:onmessage(t_concat(databuffer), databuffer.type);
+ end
+ end
+ else -- Control frame
+ if frame.length > 125 then -- Control frame with too much payload
+ return fail(s, 1002, "Payload too large");
+ elseif not frame.FIN then -- Fragmented control frame
+ return fail(s, 1002, "Fragmented control frame");
+ end
+ if frame.opcode == 0x8 then -- Close request
+ if frame.length == 1 then
+ return fail(s, 1002, "Close frame with payload, but too short for status code");
+ end
+ local status_code, message = frames.parse_close(frame.data);
+ if status_code == nil then
+ --[[ RFC 6455 7.4.1
+ 1005 is a reserved value and MUST NOT be set as a status code in a
+ Close control frame by an endpoint. It is designated for use in
+ applications expecting a status code to indicate that no status
+ code was actually present.
+ ]]
+ status_code = 1005
+ elseif status_code < 1000 then
+ return fail(s, 1002, "Closed with invalid status code");
+ elseif ((status_code > 1003 and status_code < 1007) or status_code > 1011) and status_code < 3000 then
+ return fail(s, 1002, "Closed with reserved status code");
+ end
+ s.close_code, s.close_message = status_code, message;
+ s:close(1000);
+ return true;
+ elseif frame.opcode == 0x9 then -- Ping frame
+ frame.opcode = 0xA;
+ frame.MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked
+ handler:write(frames.build(frame));
+ elseif frame.opcode == 0xA then -- Pong frame
+ log("debug", "Received unexpected pong frame: " .. tostring(frame.data));
+ else
+ return fail(s, 1002, "Reserved opcode");
+ end
+ end
+ end
+ return true;
+end
+
+local websocket_methods = {};
+local function close_timeout_cb(now, timerid, s)
+ s.close_timer = nil;
+ log("warn", "Close timeout waiting for server to close, closing manually.");
+ s.handler:close();
+end
+function websocket_methods:close(code, reason)
+ if self.readyState < 2 then
+ code = code or 1000;
+ log("debug", "closing WebSocket with code %i: %s" , code , tostring(reason));
+ self.readyState = 2;
+ local handler = self.handler;
+ handler:write(frames.build_close(code, reason, true));
+ -- Do not close socket straight away, wait for acknowledgement from server.
+ self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self);
+ elseif self.readyState == 2 then
+ log("debug", "tried to close a closing WebSocket, closing the raw socket.");
+ -- Stop timer
+ if self.close_timer then
+ timer.stop(self.close_timer);
+ self.close_timer = nil;
+ end
+ local handler = self.handler;
+ handler:close();
+ else
+ log("debug", "tried to close a closed WebSocket, ignoring.");
+ end
+end
+function websocket_methods:send(data, opcode)
+ if self.readyState < 1 then
+ return nil, "WebSocket not open yet, unable to send data.";
+ elseif self.readyState >= 2 then
+ return nil, "WebSocket closed, unable to send data.";
+ end
+ if opcode == "text" or opcode == nil then
+ opcode = 0x1;
+ elseif opcode == "binary" then
+ opcode = 0x2;
+ end
+ local frame = {
+ FIN = true;
+ MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked
+ opcode = opcode;
+ data = tostring(data);
+ };
+ log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
+ return self.handler:write(frames.build(frame));
+end
+
+local websocket_metatable = {
+ __index = websocket_methods;
+};
+
+local function connect(url, ex, listeners)
+ ex = ex or {};
+
+ --[[RFC 6455 4.1.7:
+ The request MUST include a header field with the name
+ |Sec-WebSocket-Key|. The value of this header field MUST be a
+ nonce consisting of a randomly selected 16-byte value that has
+ been base64-encoded (see Section 4 of [RFC4648]). The nonce
+ MUST be selected randomly for each connection.
+ ]]
+ local key = base64.encode(random_bytes(16));
+
+ -- Either a single protocol string or an array of protocol strings.
+ local protocol = ex.protocol;
+ if type(protocol) == "string" then
+ protocol = { protocol, [protocol] = true };
+ elseif type(protocol) == "table" and protocol[1] then
+ for _, v in ipairs(protocol) do
+ protocol[v] = true;
+ end
+ else
+ protocol = nil;
+ end
+
+ local headers = {
+ ["Upgrade"] = "websocket";
+ ["Connection"] = "Upgrade";
+ ["Sec-WebSocket-Key"] = key;
+ ["Sec-WebSocket-Protocol"] = protocol and t_concat(protocol, ", ");
+ ["Sec-WebSocket-Version"] = "13";
+ ["Sec-WebSocket-Extensions"] = ex.extensions;
+ }
+ if ex.headers then
+ for k,v in pairs(ex.headers) do
+ headers[k] = v;
+ end
+ end
+
+ local s = setmetatable({
+ readbuffer = "";
+ databuffer = nil;
+ handler = nil;
+ close_code = nil;
+ close_message = nil;
+ close_timer = nil;
+ readyState = 0;
+ protocol = nil;
+
+ url = url;
+
+ onopen = listeners.onopen;
+ onclose = listeners.onclose;
+ onmessage = listeners.onmessage;
+ onerror = listeners.onerror;
+ }, websocket_metatable);
+
+ local http_url = url:gsub("^(ws)", "http");
+ local http_req = http.request(http_url, {
+ method = "GET";
+ headers = headers;
+ sslctx = ex.sslctx;
+ }, function(b, c, r, http_req)
+ if c ~= 101
+ or r.headers["connection"]:lower() ~= "upgrade"
+ or r.headers["upgrade"] ~= "websocket"
+ or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
+ or (protocol and not protocol[r.headers["sec-websocket-protocol"]])
+ then
+ s.readyState = 3;
+ log("warn", "WebSocket connection to %s failed: %s", url, tostring(b));
+ if s.onerror then s:onerror("connecting-failed"); end
+ return;
+ end
+
+ s.protocol = r.headers["sec-websocket-protocol"];
+
+ -- Take possession of socket from http
+ http_req.conn = nil;
+ local handler = http_req.handler;
+ s.handler = handler;
+ websockets[handler] = s;
+ handler:setlistener(websocket_listeners);
+
+ log("debug", "WebSocket connected successfully to %s", url);
+ s.readyState = 1;
+ if s.onopen then s:onopen(); end
+ websocket_listeners.onincoming(handler, b);
+ end);
+
+ return s;
+end
+
+return {
+ connect = connect;
+};
diff --git a/net/websocket/frames.lua b/net/websocket/frames.lua
new file mode 100644
index 00000000..5fe96d45
--- /dev/null
+++ b/net/websocket/frames.lua
@@ -0,0 +1,219 @@
+-- Prosody IM
+-- Copyright (C) 2012 Florian Zeitz
+-- Copyright (C) 2014 Daurnimator
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+
+local softreq = require "util.dependencies".softreq;
+local random_bytes = require "util.random".bytes;
+
+local bit = assert(softreq"bit" or softreq"bit32",
+ "No bit module found. See https://prosody.im/doc/depends#bitop");
+local band = bit.band;
+local bor = bit.bor;
+local bxor = bit.bxor;
+local lshift = bit.lshift;
+local rshift = bit.rshift;
+
+local t_concat = table.concat;
+local s_byte = string.byte;
+local s_char= string.char;
+local s_sub = string.sub;
+local s_pack = string.pack;
+local s_unpack = string.unpack;
+
+if not s_pack and softreq"struct" then
+ s_pack = softreq"struct".pack;
+ s_unpack = softreq"struct".unpack;
+end
+
+local function read_uint16be(str, pos)
+ local l1, l2 = s_byte(str, pos, pos+1);
+ return l1*256 + l2;
+end
+-- FIXME: this may lose precision
+local function read_uint64be(str, pos)
+ local l1, l2, l3, l4, l5, l6, l7, l8 = s_byte(str, pos, pos+7);
+ local h = lshift(l1, 24) + lshift(l2, 16) + lshift(l3, 8) + l4;
+ local l = lshift(l5, 24) + lshift(l6, 16) + lshift(l7, 8) + l8;
+ return h * 2^32 + l;
+end
+local function pack_uint16be(x)
+ return s_char(rshift(x, 8), band(x, 0xFF));
+end
+local function get_byte(x, n)
+ return band(rshift(x, n), 0xFF);
+end
+local function pack_uint64be(x)
+ local h = band(x / 2^32, 2^32-1);
+ return s_char(get_byte(h, 24), get_byte(h, 16), get_byte(h, 8), band(h, 0xFF),
+ get_byte(x, 24), get_byte(x, 16), get_byte(x, 8), band(x, 0xFF));
+end
+
+if s_pack then
+ function pack_uint16be(x)
+ return s_pack(">I2", x);
+ end
+ function pack_uint64be(x)
+ return s_pack(">I8", x);
+ end
+end
+
+if s_unpack then
+ function read_uint16be(str, pos)
+ return s_unpack(">I2", str, pos);
+ end
+ function read_uint64be(str, pos)
+ return s_unpack(">I8", str, pos);
+ end
+end
+
+local function parse_frame_header(frame)
+ if #frame < 2 then return; end
+
+ local byte1, byte2 = s_byte(frame, 1, 2);
+ local result = {
+ FIN = band(byte1, 0x80) > 0;
+ RSV1 = band(byte1, 0x40) > 0;
+ RSV2 = band(byte1, 0x20) > 0;
+ RSV3 = band(byte1, 0x10) > 0;
+ opcode = band(byte1, 0x0F);
+
+ MASK = band(byte2, 0x80) > 0;
+ length = band(byte2, 0x7F);
+ };
+
+ local length_bytes = 0;
+ if result.length == 126 then
+ length_bytes = 2;
+ elseif result.length == 127 then
+ length_bytes = 8;
+ end
+
+ local header_length = 2 + length_bytes + (result.MASK and 4 or 0);
+ if #frame < header_length then return; end
+
+ if length_bytes == 2 then
+ result.length = read_uint16be(frame, 3);
+ elseif length_bytes == 8 then
+ result.length = read_uint64be(frame, 3);
+ end
+
+ if result.MASK then
+ result.key = { s_byte(frame, length_bytes+3, length_bytes+6) };
+ end
+
+ return result, header_length;
+end
+
+-- XORs the string `str` with the array of bytes `key`
+-- TODO: optimize
+local function apply_mask(str, key, from, to)
+ from = from or 1
+ if from < 0 then from = #str + from + 1 end -- negative indicies
+ to = to or #str
+ if to < 0 then to = #str + to + 1 end -- negative indicies
+ local key_len = #key
+ local counter = 0;
+ local data = {};
+ for i = from, to do
+ local key_index = counter%key_len + 1;
+ counter = counter + 1;
+ data[counter] = s_char(bxor(key[key_index], s_byte(str, i)));
+ end
+ return t_concat(data);
+end
+
+local function parse_frame_body(frame, header, pos)
+ if header.MASK then
+ return apply_mask(frame, header.key, pos, pos + header.length - 1);
+ else
+ return frame:sub(pos, pos + header.length - 1);
+ end
+end
+
+local function parse_frame(frame)
+ local result, pos = parse_frame_header(frame);
+ if result == nil or #frame < (pos + result.length) then return; end
+ result.data = parse_frame_body(frame, result, pos+1);
+ return result, pos + result.length;
+end
+
+local function build_frame(desc)
+ local data = desc.data or "";
+
+ assert(desc.opcode and desc.opcode >= 0 and desc.opcode <= 0xF, "Invalid WebSocket opcode");
+ if desc.opcode >= 0x8 then
+ -- RFC 6455 5.5
+ assert(#data <= 125, "WebSocket control frames MUST have a payload length of 125 bytes or less.");
+ end
+
+ local b1 = bor(desc.opcode,
+ desc.FIN and 0x80 or 0,
+ desc.RSV1 and 0x40 or 0,
+ desc.RSV2 and 0x20 or 0,
+ desc.RSV3 and 0x10 or 0);
+
+ local b2 = #data;
+ local length_extra;
+ if b2 <= 125 then -- 7-bit length
+ length_extra = "";
+ elseif b2 <= 0xFFFF then -- 2-byte length
+ b2 = 126;
+ length_extra = pack_uint16be(#data);
+ else -- 8-byte length
+ b2 = 127;
+ length_extra = pack_uint64be(#data);
+ end
+
+ local key = ""
+ if desc.MASK then
+ local key_a = desc.key
+ if key_a then
+ key = s_char(unpack(key_a, 1, 4));
+ else
+ key = random_bytes(4);
+ key_a = {key:byte(1,4)};
+ end
+ b2 = bor(b2, 0x80);
+ data = apply_mask(data, key_a);
+ end
+
+ return s_char(b1, b2) .. length_extra .. key .. data
+end
+
+local function parse_close(data)
+ local code, message
+ if #data >= 2 then
+ code = read_uint16be(data, 1);
+ if #data > 2 then
+ message = s_sub(data, 3);
+ end
+ end
+ return code, message
+end
+
+local function build_close(code, message, mask)
+ local data = pack_uint16be(code);
+ if message then
+ assert(#message<=123, "Close reason must be <=123 bytes");
+ data = data .. message;
+ end
+ return build_frame({
+ opcode = 0x8;
+ FIN = true;
+ MASK = mask;
+ data = data;
+ });
+end
+
+return {
+ parse_header = parse_frame_header;
+ parse_body = parse_frame_body;
+ parse = parse_frame;
+ build = build_frame;
+ parse_close = parse_close;
+ build_close = build_close;
+};