diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/adns.lua | 21 | ||||
-rw-r--r-- | net/connlisteners.lua | 15 | ||||
-rw-r--r-- | net/dns.lua | 41 | ||||
-rw-r--r-- | net/http.lua | 97 | ||||
-rw-r--r-- | net/http/codes.lua | 15 | ||||
-rw-r--r-- | net/http/parser.lua | 6 | ||||
-rw-r--r-- | net/http/server.lua | 84 | ||||
-rw-r--r-- | net/httpserver.lua | 11 | ||||
-rw-r--r-- | net/server.lua | 2 | ||||
-rw-r--r-- | net/server_event.lua | 1313 | ||||
-rw-r--r-- | net/server_select.lua | 112 | ||||
-rw-r--r-- | net/websocket.lua | 272 | ||||
-rw-r--r-- | net/websocket/frames.lua | 219 |
13 files changed, 1380 insertions, 828 deletions
diff --git a/net/adns.lua b/net/adns.lua index 3fc958f4..0b7247ed 100644 --- a/net/adns.lua +++ b/net/adns.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -11,14 +11,13 @@ local dns = require "net.dns"; local log = require "util.logger".init("adns"); -local t_insert, t_remove = table.insert, table.remove; local coroutine, tostring, pcall = coroutine, tostring, pcall; local function dummy_send(sock, data, i, j) return (j-i)+1; end -module "adns" +local _ENV = nil; -function lookup(handler, qname, qtype, qclass) +local function lookup(handler, qname, qtype, qclass) return coroutine.wrap(function (peek) if peek then log("debug", "Records for %s already cached, using those...", qname); @@ -43,12 +42,12 @@ function lookup(handler, qname, qtype, qclass) end)(dns.peek(qname, qtype, qclass)); end -function cancel(handle, call_handler, reason) +local function cancel(handle, call_handler, reason) log("warn", "Cancelling DNS lookup for %s", tostring(handle[3])); dns.cancel(handle[1], handle[2], handle[3], handle[4], call_handler); end -function new_async_socket(sock, resolver) +local function new_async_socket(sock, resolver) local peername = "<unknown>"; local listener = {}; local handler = {}; @@ -65,7 +64,7 @@ function new_async_socket(sock, resolver) if resolver.socketset[conn] == resolver.best_server and resolver.best_server == #servers then log("error", "Exhausted all %d configured DNS servers, next lookup will try %s again", #servers, servers[1]); end - + resolver:servfail(conn); -- Let the magic commence end end @@ -73,7 +72,7 @@ function new_async_socket(sock, resolver) if not handler then return nil, err; end - + handler.settimeout = function () end handler.setsockname = function (_, ...) return sock:setsockname(...); end handler.setpeername = function (_, ...) peername = (...); local ret, err = sock:setpeername(...); _:set_send(dummy_send); return ret, err; end @@ -88,4 +87,8 @@ end dns.socket_wrapper_set(new_async_socket); -return _M; +return { + lookup = lookup; + cancel = cancel; + new_async_socket = new_async_socket; +}; diff --git a/net/connlisteners.lua b/net/connlisteners.lua index 99ddc720..000bfa63 100644 --- a/net/connlisteners.lua +++ b/net/connlisteners.lua @@ -2,14 +2,17 @@ local log = require "util.logger".init("net.connlisteners"); local traceback = debug.traceback; -module "httpserver" +local _ENV = nil; -function fail() +local function fail() log("error", "Attempt to use legacy connlisteners API. For more info see http://prosody.im/doc/developers/network"); log("error", "Legacy connlisteners API usage, %s", traceback("", 2)); end -register, deregister = fail, fail; -get, start = fail, fail, epic_fail; - -return _M; +return { + register = fail; + register = fail; + get = fail; + start = fail; + -- epic fail +}; diff --git a/net/dns.lua b/net/dns.lua index d123731c..232e9e7c 100644 --- a/net/dns.lua +++ b/net/dns.lua @@ -22,8 +22,8 @@ local is_windows = (_ and windows) or os.getenv("WINDIR"); local coroutine, io, math, string, table = coroutine, io, math, string, table; -local ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type= - ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type; +local ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type = + ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type; local ztact = { -- public domain 20080404 lua@ztact.com get = function(parent, ...) @@ -71,8 +71,8 @@ local get, set = ztact.get, ztact.set; local default_timeout = 15; -------------------------------------------------- module dns -module('dns') -local dns = _M; +local _ENV = nil; +local dns = {}; -- dns type & class codes ------------------------------ dns type & class codes @@ -190,7 +190,7 @@ end local rrs_metatable = {}; -- - - - - - - - - - - - - - - - - - rrs_metatable function rrs_metatable.__tostring(rrs) local t = {}; - for i,rr in ipairs(rrs) do + for _, rr in ipairs(rrs) do append(t, tostring(rr)..'\n'); end return table.concat(t); @@ -213,15 +213,6 @@ function cache_metatable.__tostring(cache) end -function resolver:new() -- - - - - - - - - - - - - - - - - - - - - resolver - local r = { active = {}, cache = {}, unsorted = {} }; - setmetatable(r, resolver); - setmetatable(r.cache, cache_metatable); - setmetatable(r.unsorted, { __mode = 'kv' }); - return r; -end - - -- packet layer -------------------------------------------------- packet layer @@ -395,7 +386,7 @@ end function resolver:AAAA(rr) local addr = {}; - for i = 1, rr.rdlength, 2 do + for _ = 1, rr.rdlength, 2 do local b1, b2 = self:byte(2); table.insert(addr, ("%02x%02x"):format(b1, b2)); end @@ -532,7 +523,7 @@ end function resolver:rrs (count) -- - - - - - - - - - - - - - - - - - - - - rrs local rrs = {}; - for i = 1,count do append(rrs, self:rr()); end + for _ = 1, count do append(rrs, self:rr()); end return rrs; end @@ -545,7 +536,7 @@ function resolver:decode(packet, force) -- - - - - - - - - - - - - - decode response.question = {}; local offset = self.offset; - for i = 1,response.header.qdcount do + for _ = 1, response.header.qdcount do append(response.question, self:question()); end response.question.raw = string.sub(self.packet, offset, self.offset - 1); @@ -629,7 +620,7 @@ function resolver:getsocket(servernum) -- - - - - - - - - - - - - getsocket if peer:find(":") then sock, err = socket.udp6(); else - sock, err = socket.udp(); + sock, err = (socket.udp4 or socket.udp)(); end if sock and self.socket_wrapper then sock, err = self.socket_wrapper(sock, self); end if not sock then @@ -850,7 +841,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive rset = rset or self.socket; local response; - for i,sock in pairs(rset) do + for _, sock in pairs(rset) do if self.socketset[sock] then local packet = sock:receive(); @@ -861,7 +852,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive --print('received response'); --self.print(response); - for j,rr in pairs(response.answer) do + for _, rr in pairs(response.answer) do if rr.name:sub(-#response.question[1].name, -1) == response.question[1].name then self:remember(rr, response.question[1].type) end @@ -903,7 +894,7 @@ function resolver:feed(sock, packet, force) --print('received response'); --self.print(response); - for j,rr in pairs(response.answer) do + for _, rr in pairs(response.answer) do self:remember(rr, response.question[1].type); end @@ -1020,7 +1011,7 @@ end function resolver.print(response) -- - - - - - - - - - - - - resolver.print - for s,s in pairs { 'id', 'qr', 'opcode', 'aa', 'tc', 'rd', 'ra', 'z', + for _, s in pairs { 'id', 'qr', 'opcode', 'aa', 'tc', 'rd', 'ra', 'z', 'rcode', 'qdcount', 'ancount', 'nscount', 'arcount' } do print( string.format('%-30s', 'header.'..s), response.header[s], hint(response.header, s) ); end @@ -1033,9 +1024,9 @@ function resolver.print(response) -- - - - - - - - - - - - - resolver.print local common = { name=1, type=1, class=1, ttl=1, rdlength=1, rdata=1 }; local tmp; - for s,s in pairs({'answer', 'authority', 'additional'}) do + for _, s in pairs({'answer', 'authority', 'additional'}) do for i,rr in pairs(response[s]) do - for j,t in pairs({ 'name', 'type', 'class', 'ttl', 'rdlength' }) do + for _, t in pairs({ 'name', 'type', 'class', 'ttl', 'rdlength' }) do tmp = string.format('%s[%i].%s', s, i, t); print(string.format('%-30s', tmp), rr[t], hint(rr, t)); end @@ -1054,8 +1045,6 @@ end function dns.resolver () -- - - - - - - - - - - - - - - - - - - - - resolver - -- this function seems to be redundant with resolver.new () - local r = { active = {}, cache = {}, unsorted = {}, wanted = {}, best_server = 1 }; setmetatable (r, resolver); setmetatable (r.cache, cache_metatable); diff --git a/net/http.lua b/net/http.lua index 8ce47494..5c1f2058 100644 --- a/net/http.lua +++ b/net/http.lua @@ -1,12 +1,11 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- -local socket = require "socket" local b64 = require "util.encodings".base64.encode; local url = require "socket.url" local httpstream_new = require "net.http.parser".new; @@ -24,10 +23,12 @@ local assert, error = assert, error local log = require "util.logger".init("http"); -module "http" +local _ENV = nil; local requests = {}; -- Open requests +local function make_id(req) return (tostring(req):match("%x+$")); end + local listener = { default_port = 80, default_mode = "*a" }; function listener.onconnect(conn) @@ -37,7 +38,7 @@ function listener.onconnect(conn) if req.query then t_insert(request_line, 4, "?"..req.query); end - + conn:write(t_concat(request_line)); local t = { [2] = ": ", [4] = "\r\n" }; for k, v in pairs(req.headers) do @@ -45,7 +46,7 @@ function listener.onconnect(conn) conn:write(t_concat(t)); end conn:write("\r\n"); - + if req.body then conn:write(req.body); end @@ -76,6 +77,13 @@ function listener.ondetach(conn) requests[conn] = nil; end +local function destroy_request(request) + if request.conn then + request.conn = nil; + request.handler:close() + end +end + local function request_reader(request, data, err) if not request.parser then local function error_cb(reason) @@ -85,12 +93,12 @@ local function request_reader(request, data, err) end destroy_request(request); end - + if not data then error_cb(err); return; end - + local function success_cb(r) if request.callback then request.callback(r.body, r.code, r, request); @@ -107,20 +115,29 @@ local function request_reader(request, data, err) end local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end -function request(u, ex, callback) +local function log_if_failed(id, ret, ...) + if not ret then + log("error", "Request '%s': error in callback: %s", id, tostring((...))); + end + return ...; +end + +local function request(u, ex, callback) local req = url.parse(u); - + if not (req and req.host) then callback(nil, 0, req); return nil, "invalid-url"; end - + if not req.path then req.path = "/"; end - + + req.id = ex and ex.id or make_id(req); + local method, headers, body; - + local host, port = req.host, req.port; local host_header = host; if (port == "80" and req.scheme == "http") @@ -134,7 +151,7 @@ function request(u, ex, callback) ["Host"] = host_header; ["User-Agent"] = "Prosody XMPP Server"; }; - + if req.userinfo then headers["Authorization"] = "Basic "..b64(req.userinfo); end @@ -154,34 +171,35 @@ function request(u, ex, callback) end end end - + + log("debug", "Making %s %s request '%s' to %s", req.scheme:upper(), method or "GET", req.id, (ex and ex.suppress_url and host_header) or u); + -- Attach to request object req.method, req.headers, req.body = method, headers, body; - + local using_https = req.scheme == "https"; if using_https and not ssl_available then error("SSL not available, unable to contact https URL"); end local port_number = port and tonumber(port) or (using_https and 443 or 80); - - -- Connect the socket, and wrap it with net.server - local conn = socket.tcp(); - conn:settimeout(10); - local ok, err = conn:connect(host, port_number); - if not ok and err ~= "timeout" then - callback(nil, 0, req); - return nil, err; - end - + local sslctx = false; if using_https then sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2", "no_sslv3" } }; end - req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx)); + local handler, conn = server.addclient(host, port_number, listener, "*a", sslctx) + if not handler then + callback(nil, 0, req); + return nil, conn; + end + req.handler, req.conn = handler, conn req.write = function (...) return req.handler:write(...); end - - req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end + + req.callback = function (content, code, request, response) + log("debug", "Request '%s': Calling callback, status %s", req.id, code or "---"); + return log_if_failed(req.id, xpcall(function () return callback(content, code, request, response) end, handleerr)); + end req.reader = request_reader; req.state = "status"; @@ -189,17 +207,12 @@ function request(u, ex, callback) return req; end -function destroy_request(request) - if request.conn then - request.conn = nil; - request.handler:close() - end -end - -local urlencode, urldecode = util_http.urlencode, util_http.urldecode; -local formencode, formdecode = util_http.formencode, util_http.formdecode; - -_M.urlencode, _M.urldecode = urlencode, urldecode; -_M.formencode, _M.formdecode = formencode, formdecode; - -return _M; +return { + request = request; + + -- COMPAT + urlencode = util_http.urlencode; + urldecode = util_http.urldecode; + formencode = util_http.formencode; + formdecode = util_http.formdecode; +}; diff --git a/net/http/codes.lua b/net/http/codes.lua index 0cadd079..1090e545 100644 --- a/net/http/codes.lua +++ b/net/http/codes.lua @@ -25,6 +25,7 @@ local response_codes = { [305] = "Use Proxy"; -- The 306 status code was used in a previous version of [RFC2616], is no longer used, and the code is reserved. [307] = "Temporary Redirect"; + [308] = "Permanent Redirect"; [400] = "Bad Request"; [401] = "Unauthorized"; @@ -39,17 +40,22 @@ local response_codes = { [410] = "Gone"; [411] = "Length Required"; [412] = "Precondition Failed"; - [413] = "Request Entity Too Large"; - [414] = "Request-URI Too Long"; + [413] = "Payload Too Large"; + [414] = "URI Too Long"; [415] = "Unsupported Media Type"; - [416] = "Requested Range Not Satisfiable"; + [416] = "Range Not Satisfiable"; [417] = "Expectation Failed"; [418] = "I'm a teapot"; + [421] = "Misdirected Request"; [422] = "Unprocessable Entity"; [423] = "Locked"; [424] = "Failed Dependency"; -- The 425 status code is reserved for the WebDAV advanced collections expired proposal [RFC2817] [426] = "Upgrade Required"; + [428] = "Precondition Required"; + [429] = "Too Many Requests"; + [431] = "Request Header Fields Too Large"; + [451] = "Unavailable For Legal Reasons"; [500] = "Internal Server Error"; [501] = "Not Implemented"; @@ -61,7 +67,8 @@ local response_codes = { [507] = "Insufficient Storage"; [508] = "Loop Detected"; [510] = "Not Extended"; + [511] = "Network Authentication Required"; }; for k,v in pairs(response_codes) do response_codes[k] = k.." "..v; end -return setmetatable(response_codes, { __index = function(t, k) return k.." Unassigned"; end }) +return setmetatable(response_codes, { __index = function(_, k) return k.." Unassigned"; end }) diff --git a/net/http/parser.lua b/net/http/parser.lua index 1e698728..96d32ec8 100644 --- a/net/http/parser.lua +++ b/net/http/parser.lua @@ -38,7 +38,7 @@ function httpstream.new(success_cb, error_cb, parser_type, options_cb) local have_body; local error; return { - feed = function(self, data) + feed = function(_, data) if error then return nil, "parse has failed"; end if not data then -- EOF if buftable then buf, buftable = t_concat(buf), false; end @@ -134,6 +134,9 @@ function httpstream.new(success_cb, error_cb, parser_type, options_cb) if state then -- read body if client then if chunked then + if chunk_start and buflen - chunk_start - 2 < chunk_size then + return; + end -- not enough data if buftable then buf, buftable = t_concat(buf), false; end if not buf:find("\r\n", nil, true) then return; @@ -150,6 +153,7 @@ function httpstream.new(success_cb, error_cb, parser_type, options_cb) elseif buflen - chunk_start - 2 >= chunk_size then -- we have a chunk packet.body = packet.body..buf:sub(chunk_start, chunk_start + (chunk_size-1)); buf = buf:sub(chunk_start + chunk_size + 2); + buflen = buflen - (chunk_start + chunk_size + 2 - 1); chunk_size, chunk_start = nil, nil; else -- Partial chunk remaining break; diff --git a/net/http/server.lua b/net/http/server.lua index 32cda8aa..8aa28f5c 100644 --- a/net/http/server.lua +++ b/net/http/server.lua @@ -11,11 +11,14 @@ local setmetatable = setmetatable; local xpcall = xpcall; local traceback = debug.traceback; local tostring = tostring; +local cache = require "util.cache"; local codes = require "net.http.codes"; +local blocksize = 2^16; local _M = {}; local sessions = {}; +local incomplete = {}; local listener = {}; local hosts = {}; local default_host; @@ -28,7 +31,10 @@ local function is_wildcard_match(wildcard_event, event) return wildcard_event:sub(1, -2) == event:sub(1, #wildcard_event-1); end -local recent_wildcard_events, max_cached_wildcard_events = {}, 10000; +local _handlers = events._handlers; +local recent_wildcard_events = cache.new(10000, function (key, value) -- luacheck: ignore 212/value + rawset(_handlers, key, nil); +end); local event_map = events._event_map; setmetatable(events._handlers, { @@ -63,10 +69,7 @@ setmetatable(events._handlers, { end rawset(handlers, curr_event, handlers_array); if not event_map[curr_event] then -- Only wildcard handlers match, if any - table.insert(recent_wildcard_events, curr_event); - if #recent_wildcard_events > max_cached_wildcard_events then - rawset(handlers, table.remove(recent_wildcard_events, 1), nil); - end + recent_wildcard_events:set(curr_event, true); end return handlers_array; end; @@ -143,17 +146,26 @@ function listener.ondisconnect(conn) open_response.finished = true; open_response:on_destroy(); end + incomplete[conn] = nil; sessions[conn] = nil; end function listener.ondetach(conn) sessions[conn] = nil; + incomplete[conn] = nil; end function listener.onincoming(conn, data) sessions[conn]:feed(data); end +function listener.ondrain(conn) + local response = incomplete[conn]; + if response and response._send_more then + response._send_more(); + end +end + local headerfix = setmetatable({}, { __index = function(t, k) local v = "\r\n"..k:gsub("_", "-"):gsub("%f[%w].", s_upper)..": "; @@ -162,7 +174,7 @@ local headerfix = setmetatable({}, { end }); -function _M.hijack_response(response, listener) +function _M.hijack_response(response, listener) -- luacheck: ignore error("TODO"); end function handle_request(conn, request, finish_cb) @@ -193,6 +205,8 @@ function handle_request(conn, request, finish_cb) persistent = persistent; conn = conn; send = _M.send_response; + send_file = _M.send_file; + done = _M.finish_response; finish_cb = finish_cb; }; conn._http_open_response = response; @@ -212,7 +226,7 @@ function handle_request(conn, request, finish_cb) err_code, err = 400, "Missing or invalid 'Host' header"; end end - + if err then response.status_code = err_code; response:send(events.fire_event("http-error", { code = err_code, message = err })); @@ -254,24 +268,60 @@ function handle_request(conn, request, finish_cb) response.status_code = 404; response:send(events.fire_event("http-error", { code = 404 })); end -function _M.send_response(response, body) - if response.finished then return; end - response.finished = true; - response.conn._http_open_response = nil; - +local function prepare_header(response) local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]); local headers = response.headers; - body = body or response.body or ""; - headers.content_length = #body; - local output = { status_line }; for k,v in pairs(headers) do t_insert(output, headerfix[k]..v); end t_insert(output, "\r\n\r\n"); + return output; +end +_M.prepare_header = prepare_header; +function _M.send_response(response, body) + if response.finished then return; end + body = body or response.body or ""; + response.headers.content_length = #body; + local output = prepare_header(response); t_insert(output, body); - response.conn:write(t_concat(output)); + response:done(); +end +function _M.send_file(response, f) + if response.finished then return; end + local chunked = not response.headers.content_length; + if chunked then response.headers.transfer_encoding = "chunked"; end + incomplete[response.conn] = response; + response._send_more = function () + if response.finished then + incomplete[response.conn] = nil; + return; + end + local chunk = f:read(blocksize); + if chunk then + if chunked then + chunk = ("%x\r\n%s\r\n"):format(#chunk, chunk); + end + -- io.write("."); io.flush(); + response.conn:write(chunk); + else + if chunked then + response.conn:write("0\r\n\r\n"); + end + -- io.write("\n"); + if f.close then f:close(); end + incomplete[response.conn] = nil; + return response:done(); + end + end + response.conn:write(t_concat(prepare_header(response))); + return true; +end +function _M.finish_response(response) + if response.finished then return; end + response.finished = true; + response.conn._http_open_response = nil; if response.on_destroy then response:on_destroy(); response.on_destroy = nil; @@ -290,7 +340,7 @@ function _M.remove_handler(event, handler) end function _M.listen_on(port, interface, ssl) - addserver(interface or "*", port, listener, "*a", ssl); + return addserver(interface or "*", port, listener, "*a", ssl); end function _M.add_host(host) hosts[host] = true; diff --git a/net/httpserver.lua b/net/httpserver.lua index 7d574788..6e2e31b9 100644 --- a/net/httpserver.lua +++ b/net/httpserver.lua @@ -2,14 +2,15 @@ local log = require "util.logger".init("net.httpserver"); local traceback = debug.traceback; -module "httpserver" +local _ENV = nil; function fail() log("error", "Attempt to use legacy HTTP API. For more info see http://prosody.im/doc/developers/legacy_http"); log("error", "Legacy HTTP API usage, %s", traceback("", 2)); end -new, new_from_config = fail, fail; -set_default_handler = fail; - -return _M; +return { + new = fail; + new_from_config = fail; + set_default_handler = fail; +}; diff --git a/net/server.lua b/net/server.lua index 9b0d27e1..41e180fa 100644 --- a/net/server.lua +++ b/net/server.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- diff --git a/net/server_event.lua b/net/server_event.lua index 882d10ed..43a19fe9 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -11,6 +11,7 @@ -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing --]] +-- luacheck: ignore 212/self 431/err 211/ret local SCRIPT_NAME = "server_event.lua" local SCRIPT_VERSION = "0.05" @@ -29,30 +30,36 @@ local cfg = { WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) + READ_RETRY_DELAY = 1e-06, -- if, after reading, there is still data in buffer, wait this long and continue reading DEBUG = true, -- show debug messages } -local function use(x) return rawget(_G, x); end -local ipairs = use "ipairs" -local string = use "string" -local select = use "select" -local require = use "require" -local tostring = use "tostring" -local coroutine = use "coroutine" -local setmetatable = use "setmetatable" +local pairs = pairs +local select = select +local require = require +local tostring = tostring +local setmetatable = setmetatable local t_insert = table.insert local t_concat = table.concat +local s_sub = string.sub -local ssl = use "ssl" -local socket = use "socket" or require "socket" +local coroutine_wrap = coroutine.wrap +local coroutine_yield = coroutine.yield + +local has_luasec, ssl = pcall ( require , "ssl" ) +local socket = require "socket" +local levent = require "luaevent.core" + +local socket_gettime = socket.gettime +local getaddrinfo = socket.dns.getaddrinfo local log = require ("util.logger").init("socket") local function debug(...) return log("debug", ("%s "):rep(select('#', ...)), ...) end -local vdebug = debug; +-- local vdebug = debug; local bitor = ( function( ) -- thx Rici Lake local hasbit = function( x, p ) @@ -72,741 +79,684 @@ local bitor = ( function( ) -- thx Rici Lake end end )( ) -local event = require "luaevent.core" -local base = event.new( ) -local EV_READ = event.EV_READ -local EV_WRITE = event.EV_WRITE -local EV_TIMEOUT = event.EV_TIMEOUT -local EV_SIGNAL = event.EV_SIGNAL +local base = levent.new( ) +local addevent = base.addevent +local EV_READ = levent.EV_READ +local EV_WRITE = levent.EV_WRITE +local EV_TIMEOUT = levent.EV_TIMEOUT +local EV_SIGNAL = levent.EV_SIGNAL local EV_READWRITE = bitor( EV_READ, EV_WRITE ) -local interfacelist = ( function( ) -- holds the interfaces for sockets - local array = { } - local len = 0 - return function( method, arg ) - if "add" == method then - len = len + 1 - array[ len ] = arg - arg:_position( len ) - return len - elseif "delete" == method then - if len <= 0 then - return nil, "array is already empty" +local interfacelist = { } + +-- Client interface methods +local interface_mt = {}; interface_mt.__index = interface_mt; + +-- Private methods +function interface_mt:_close() + return self:_destroy(); +end + +function interface_mt:_start_connection(plainssl) -- called from wrapclient + local callback = function( event ) + if EV_TIMEOUT == event then -- timeout during connection + self.fatalerror = "connection timeout" + self:ontimeout() -- call timeout listener + self:_close() + debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) + else + if plainssl and has_luasec then -- start ssl session + self:starttls(self._sslctx, true) + else -- normal connection + self:_start_session(true) end - local position = arg:_position() -- get position in array - if position ~= len then - local interface = array[ len ] -- get last interface - array[ position ] = interface -- copy it into free position - array[ len ] = nil -- free last position - interface:_position( position ) -- set new position in array - else -- free last position - array[ len ] = nil + debug( "new connection established. id:", self.id ) + end + self.eventconnect = nil + return -1 + end + self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) + return true +end +function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl + if self.type == "client" then + local callback = function( ) + self:_lock( false, false, false ) + --vdebug( "start listening on client socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + if call_onconnect then + self:onconnect() end - len = len - 1 - return len - else - return array + self.eventsession = nil + return -1 end + self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + else + self:_lock( false ) + --vdebug( "start listening on server socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback end -end )( ) - --- Client interface methods -local interface_mt -do - interface_mt = {}; interface_mt.__index = interface_mt; - - local addevent = base.addevent - local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - - -- Private methods - function interface_mt:_position(new_position) - self.position = new_position or self.position - return self.position; - end - function interface_mt:_close() - return self:_destroy(); - end - - function interface_mt:_start_connection(plainssl) -- should be called from addclient - local callback = function( event ) - if EV_TIMEOUT == event then -- timeout during connection - self.fatalerror = "connection timeout" - self:ontimeout() -- call timeout listener - self:_close() - debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) - else - if plainssl and ssl then -- start ssl session - self:starttls(self._sslctx, true) - else -- normal connection - self:_start_session(true) + return true +end +function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first + --vdebug( "starting ssl session with client id:", self.id ) + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventwrite and self.eventwrite:close( ) + self.eventread, self.eventwrite = nil, nil + local err + self.conn, err = ssl.wrap( self.conn, self._sslctx ) + if err then + self.fatalerror = err + self.conn = nil -- cannot be used anymore + if call_onconnect then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "fatal error while ssl wrapping:", err ) + return false + end + self.conn:settimeout( 0 ) -- set non blocking + local handshakecallback = coroutine_wrap(function( event ) + local _, err + local attempt = 0 + local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS + while attempt < maxattempt do -- no endless loop + attempt = attempt + 1 + debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) + if attempt > maxattempt then + self.fatalerror = "max handshake attempts exceeded" + elseif EV_TIMEOUT == event then + self.fatalerror = "timeout during handshake" + else + _, err = self.conn:dohandshake( ) + if not err then + self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed + self.send = self.conn.send -- caching table lookups with new client object + self.receive = self.conn.receive + if not call_onconnect then -- trigger listener + self:onstatus("ssl-handshake-complete"); end - debug( "new connection established. id:", self.id ) + self:_start_session( call_onconnect ) + debug( "ssl handshake done" ) + self.eventhandshake = nil + return -1 end - self.eventconnect = nil - return -1 - end - self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) - return true - end - function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl - if self.type == "client" then - local callback = function( ) - self:_lock( false, false, false ) - --vdebug( "start listening on client socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback - if call_onconnect then - self:onconnect() + if err == "wantwrite" then + event = EV_WRITE + elseif err == "wantread" then + event = EV_READ + else + debug( "ssl handshake error:", err ) + self.fatalerror = err end - self.eventsession = nil - return -1 end - self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - else - self:_lock( false ) - --vdebug( "start listening on server socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback - end - return true - end - function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first - --vdebug( "starting ssl session with client id:", self.id ) - local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! - _ = self.eventwrite and self.eventwrite:close( ) - self.eventread, self.eventwrite = nil, nil - local err - self.conn, err = ssl.wrap( self.conn, self._sslctx ) - if err then - self.fatalerror = err - self.conn = nil -- cannot be used anymore + if self.fatalerror then if call_onconnect then self.ondisconnect = nil -- dont call this when client isnt really connected end self:_close() - debug( "fatal error while ssl wrapping:", err ) - return false - end - self.conn:settimeout( 0 ) -- set non blocking - local handshakecallback = coroutine_wrap( - function( event ) - local _, err - local attempt = 0 - local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS - while attempt < maxattempt do -- no endless loop - attempt = attempt + 1 - debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) - if attempt > maxattempt then - self.fatalerror = "max handshake attempts exceeded" - elseif EV_TIMEOUT == event then - self.fatalerror = "timeout during handshake" - else - _, err = self.conn:dohandshake( ) - if not err then - self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed - self.send = self.conn.send -- caching table lookups with new client object - self.receive = self.conn.receive - if not call_onconnect then -- trigger listener - self:onstatus("ssl-handshake-complete"); - end - self:_start_session( call_onconnect ) - debug( "ssl handshake done" ) - self.eventhandshake = nil - return -1 - end - if err == "wantwrite" then - event = EV_WRITE - elseif err == "wantread" then - event = EV_READ - else - debug( "ssl handshake error:", err ) - self.fatalerror = err - end - end - if self.fatalerror then - if call_onconnect then - self.ondisconnect = nil -- dont call this when client isnt really connected - end - self:_close() - debug( "handshake failed because:", self.fatalerror ) - self.eventhandshake = nil - return -1 - end - event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... - end - end - ) - debug "starting handshake..." - self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked - self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) - return true - end - function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id, self.fatalerror ) - self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions - local _ - _ = self.eventread and self.eventread:close( ) - if self.type == "client" then - _ = self.eventwrite and self.eventwrite:close( ) - _ = self.eventhandshake and self.eventhandshake:close( ) - _ = self.eventstarthandshake and self.eventstarthandshake:close( ) - _ = self.eventconnect and self.eventconnect:close( ) - _ = self.eventsession and self.eventsession:close( ) - _ = self.eventwritetimeout and self.eventwritetimeout:close( ) - _ = self.eventreadtimeout and self.eventreadtimeout:close( ) - _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection - _ = self._server and self._server:counter(-1); - self.eventread, self.eventwrite = nil, nil - self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil - self.readcallback, self.writecallback = nil, nil - else - self.conn:close( ) - self.eventread, self.eventclose = nil, nil - self.interface, self.readcallback = nil, nil + debug( "handshake failed because:", self.fatalerror ) + self.eventhandshake = nil + return -1 end - interfacelist( "delete", self ) - return true - end - - function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events - self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting - return nointerface, noreading, nowriting - end - - --TODO: Deprecate - function interface_mt:lock_read(switch) - if switch then - return self:pause(); - else - return self:resume(); + event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... end end + ) + debug "starting handshake..." + self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked + self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) + return true +end +function interface_mt:_destroy() -- close this interface + events and call last listener + debug( "closing client with id:", self.id, self.fatalerror ) + self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions + local _ + _ = self.eventread and self.eventread:close( ) + if self.type == "client" then + _ = self.eventwrite and self.eventwrite:close( ) + _ = self.eventhandshake and self.eventhandshake:close( ) + _ = self.eventstarthandshake and self.eventstarthandshake:close( ) + _ = self.eventconnect and self.eventconnect:close( ) + _ = self.eventsession and self.eventsession:close( ) + _ = self.eventwritetimeout and self.eventwritetimeout:close( ) + _ = self.eventreadtimeout and self.eventreadtimeout:close( ) + _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) + _ = self.conn and self.conn:close( ) -- close connection + _ = self._server and self._server:counter(-1); + self.eventread, self.eventwrite = nil, nil + self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil + self.readcallback, self.writecallback = nil, nil + else + self.conn:close( ) + self.eventread, self.eventclose = nil, nil + self.interface, self.readcallback = nil, nil + end + interfacelist[ self ] = nil + return true +end - function interface_mt:pause() - return self:_lock(self.nointerface, true, self.nowriting); - end +function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events + self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting + return nointerface, noreading, nowriting +end - function interface_mt:resume() - self:_lock(self.nointerface, false, self.nowriting); - if self.readcallback and not self.eventread then - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback - return true; - end +--TODO: Deprecate +function interface_mt:lock_read(switch) + if switch then + return self:pause(); + else + return self:resume(); end +end - function interface_mt:counter(c) - if c then - self._connections = self._connections + c - end - return self._connections - end - - -- Public methods - function interface_mt:write(data) - if self.nowriting then return nil, "locked" end - --vdebug( "try to send data to client, id/data:", self.id, data ) - data = tostring( data ) - local len = #data - local total = len + self.writebufferlen - if total > cfg.MAX_SEND_LENGTH then -- check buffer length - local err = "send buffer exceeded" - debug( "error:", err ) -- to much, check your app - return nil, err - end - t_insert(self.writebuffer, data) -- new buffer - self.writebufferlen = total - if not self.eventwrite then -- register new write event - --vdebug( "register new write event" ) - self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) - end - return true - end - function interface_mt:close() - if self.nointerface then return nil, "locked"; end - debug( "try to close client connection with id:", self.id ) - if self.type == "client" then - self.fatalerror = "client to close" - if self.eventwrite then -- wait for incomplete write request - self:_lock( true, true, false ) - debug "closing delayed until writebuffer is empty" - return nil, "writebuffer not empty, waiting" - else -- close now - self:_lock( true, true, true ) - self:_close() - return true - end - else - debug( "try to close server with id:", tostring(self.id)) - self.fatalerror = "server to close" - self:_lock( true ) - self:_close( 0 ) - return true - end - end - - function interface_mt:socket() - return self.conn - end - - function interface_mt:server() - return self._server or self; - end - - function interface_mt:port() - return self._port - end - - function interface_mt:serverport() - return self._serverport - end - - function interface_mt:ip() - return self._ip - end - - function interface_mt:ssl() - return self._usingssl - end - interface_mt.clientport = interface_mt.port -- COMPAT server_select +function interface_mt:pause() + return self:_lock(self.nointerface, true, self.nowriting); +end - function interface_mt:type() - return self._type or "client" +function interface_mt:resume() + self:_lock(self.nointerface, false, self.nowriting); + if self.readcallback and not self.eventread then + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + return true; end - - function interface_mt:connections() - return self._connections - end - - function interface_mt:address() - return self.addr - end - - function interface_mt:set_sslctx(sslctx) - self._sslctx = sslctx; - if sslctx then - self.starttls = nil; -- use starttls() of interface_mt - else - self.starttls = false; -- prevent starttls() - end +end + +function interface_mt:counter(c) + if c then + self._connections = self._connections + c end + return self._connections +end - function interface_mt:set_mode(pattern) - if pattern then - self._pattern = pattern; - end - return self._pattern; - end - - function interface_mt:set_send(new_send) - -- No-op, we always use the underlying connection's send - end - - function interface_mt:starttls(sslctx, call_onconnect) - debug( "try to start ssl at client id:", self.id ) - local err - self._sslctx = sslctx; - if self._usingssl then -- startssl was already called - err = "ssl already active" - end - if err then - debug( "error:", err ) - return nil, err - end - self._usingssl = true - self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event - self.startsslcallback = nil - self:_start_ssl(call_onconnect); - self.eventstarthandshake = nil - return -1 - end - if not self.eventwrite then - self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake - self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake - else -- wait until writebuffer is empty +-- Public methods +function interface_mt:write(data) + if self.nowriting then return nil, "locked" end + --vdebug( "try to send data to client, id/data:", self.id, data ) + data = tostring( data ) + local len = #data + local total = len + self.writebufferlen + if total > cfg.MAX_SEND_LENGTH then -- check buffer length + local err = "send buffer exceeded" + debug( "error:", err ) -- to much, check your app + return nil, err + end + t_insert(self.writebuffer, data) -- new buffer + self.writebufferlen = total + if not self.eventwrite then -- register new write event + --vdebug( "register new write event" ) + self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) + end + return true +end +function interface_mt:close() + if self.nointerface then return nil, "locked"; end + debug( "try to close client connection with id:", self.id ) + if self.type == "client" then + self.fatalerror = "client to close" + if self.eventwrite then -- wait for incomplete write request self:_lock( true, true, false ) - debug "ssl session delayed until writebuffer is empty..." + debug "closing delayed until writebuffer is empty" + return nil, "writebuffer not empty, waiting" + else -- close now + self:_lock( true, true, true ) + self:_close() + return true end - self.starttls = false; + else + debug( "try to close server with id:", tostring(self.id)) + self.fatalerror = "server to close" + self:_lock( true ) + self:_close( 0 ) return true end - - function interface_mt:setoption(option, value) - if self.conn.setoption then - return self.conn:setoption(option, value); - end - return false, "setoption not implemented"; - end - - function interface_mt:setlistener(listener) - self:ondetach(); -- Notify listener that it is no longer responsible for this connection - self.onconnect, self.ondisconnect, self.onincoming, - self.ontimeout, self.onstatus, self.ondetach - = listener.onconnect, listener.ondisconnect, listener.onincoming, - listener.ontimeout, listener.onstatus, listener.ondetach; - end - - -- Stub handlers - function interface_mt:onconnect() - end - function interface_mt:onincoming() - end - function interface_mt:ondisconnect() - end - function interface_mt:ontimeout() - end - function interface_mt:ondrain() +end + +function interface_mt:socket() + return self.conn +end + +function interface_mt:server() + return self._server or self; +end + +function interface_mt:port() + return self._port +end + +function interface_mt:serverport() + return self._serverport +end + +function interface_mt:ip() + return self._ip +end + +function interface_mt:ssl() + return self._usingssl +end +interface_mt.clientport = interface_mt.port -- COMPAT server_select + +function interface_mt:type() + return self._type or "client" +end + +function interface_mt:connections() + return self._connections +end + +function interface_mt:address() + return self.addr +end + +function interface_mt:set_sslctx(sslctx) + self._sslctx = sslctx; + if sslctx then + self.starttls = nil; -- use starttls() of interface_mt + else + self.starttls = false; -- prevent starttls() end - function interface_mt:ondetach() +end + +function interface_mt:set_mode(pattern) + if pattern then + self._pattern = pattern; end - function interface_mt:onstatus() + return self._pattern; +end + +function interface_mt:set_send(new_send) -- luacheck: ignore 212 + -- No-op, we always use the underlying connection's send +end + +function interface_mt:starttls(sslctx, call_onconnect) + debug( "try to start ssl at client id:", self.id ) + local err + self._sslctx = sslctx; + if self._usingssl then -- startssl was already called + err = "ssl already active" + end + if err then + debug( "error:", err ) + return nil, err + end + self._usingssl = true + self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event + self.startsslcallback = nil + self:_start_ssl(call_onconnect); + self.eventstarthandshake = nil + return -1 + end + if not self.eventwrite then + self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake + self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake + else + -- wait until writebuffer is empty + self:_lock( true, true, false ) + debug "ssl session delayed until writebuffer is empty..." + end + self.starttls = false; + return true +end + +function interface_mt:setoption(option, value) + if self.conn.setoption then + return self.conn:setoption(option, value); end + return false, "setoption not implemented"; +end + +function interface_mt:setlistener(listener) + self:ondetach(); -- Notify listener that it is no longer responsible for this connection + self.onconnect = listener.onconnect; + self.ondisconnect = listener.ondisconnect; + self.onincoming = listener.onincoming; + self.ontimeout = listener.ontimeout; + self.onreadtimeout = listener.onreadtimeout; + self.onstatus = listener.onstatus; + self.ondetach = listener.ondetach; +end + +-- Stub handlers +function interface_mt:onconnect() +end +function interface_mt:onincoming() +end +function interface_mt:ondisconnect() +end +function interface_mt:ontimeout() +end +function interface_mt:onreadtimeout() + self.fatalerror = "timeout during receiving" + debug( "connection failed:", self.fatalerror ) + self:_close() + self.eventread = nil +end +function interface_mt:ondrain() +end +function interface_mt:ondetach() +end +function interface_mt:onstatus() end -- End of client interface methods -local handleclient; -do - local string_sub = string.sub -- caching table lookups - local addevent = base.addevent - local socket_gettime = socket.gettime - function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface - --vdebug("creating client interfacce...") - local interface = { - type = "client"; - conn = client; - currenttime = socket_gettime( ); -- safe the origin - writebuffer = {}; -- writebuffer - writebufferlen = 0; -- length of writebuffer - send = client.send; -- caching table lookups - receive = client.receive; - onconnect = listener.onconnect; -- will be called when client disconnects - ondisconnect = listener.ondisconnect; -- will be called when client disconnects - onincoming = listener.onincoming; -- will be called when client sends data - ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs - ondrain = listener.ondrain; -- called when writebuffer is empty - ondetach = listener.ondetach; -- called when disassociating this listener from this connection - onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) - eventread = false, eventwrite = false, eventclose = false, - eventhandshake = false, eventstarthandshake = false; -- event handler - eventconnect = false, eventsession = false; -- more event handler... - eventwritetimeout = false; -- even more event handler... - eventreadtimeout = false; - fatalerror = false; -- error message - writecallback = false; -- will be called on write events - readcallback = false; -- will be called on read events - nointerface = true; -- lock/unlock parameter of this interface - noreading = false, nowriting = false; -- locks of the read/writecallback - startsslcallback = false; -- starting handshake callback - position = false; -- position of client in interfacelist - - -- Properties - _ip = ip, _port = port, _server = server, _pattern = pattern, - _serverport = (server and server:port() or nil), - _sslctx = sslctx; -- parameters - _usingssl = false; -- client is using ssl; - } - if not ssl then interface.starttls = false; end - interface.id = tostring(interface):match("%x+$"); - interface.writecallback = function( event ) -- called on write events - --vdebug( "new client write event, id/ip/port:", interface, ip, port ) - if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event - --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) - interface.eventwrite = false - return -1 - end - if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect - interface.fatalerror = "timeout during writing" - debug( "writing failed:", interface.fatalerror ) - interface:_close() - interface.eventwrite = false - return -1 - else -- can write :) - if interface._usingssl then -- handle luasec - if interface.eventreadtimeout then -- we have to read first - local ret = interface.readcallback( ) -- call readcallback - --vdebug( "tried to read in writecallback, result:", ret ) - end - if interface.eventwritetimeout then -- luasec only - interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error - interface.eventwritetimeout = false - end +local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface + --vdebug("creating client interfacce...") + local interface = { + type = "client"; + conn = client; + currenttime = socket_gettime( ); -- safe the origin + writebuffer = {}; -- writebuffer + writebufferlen = 0; -- length of writebuffer + send = client.send; -- caching table lookups + receive = client.receive; + onconnect = listener.onconnect; -- will be called when client disconnects + ondisconnect = listener.ondisconnect; -- will be called when client disconnects + onincoming = listener.onincoming; -- will be called when client sends data + ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs + ondrain = listener.ondrain; -- called when writebuffer is empty + ondetach = listener.ondetach; -- called when disassociating this listener from this connection + onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) + eventread = false, eventwrite = false, eventclose = false, + eventhandshake = false, eventstarthandshake = false; -- event handler + eventconnect = false, eventsession = false; -- more event handler... + eventwritetimeout = false; -- even more event handler... + eventreadtimeout = false; + fatalerror = false; -- error message + writecallback = false; -- will be called on write events + readcallback = false; -- will be called on read events + nointerface = true; -- lock/unlock parameter of this interface + noreading = false, nowriting = false; -- locks of the read/writecallback + startsslcallback = false; -- starting handshake callback + position = false; -- position of client in interfacelist + + -- Properties + _ip = ip, _port = port, _server = server, _pattern = pattern, + _serverport = (server and server:port() or nil), + _sslctx = sslctx; -- parameters + _usingssl = false; -- client is using ssl; + } + if not has_luasec then interface.starttls = false; end + interface.id = tostring(interface):match("%x+$"); + interface.writecallback = function( event ) -- called on write events + --vdebug( "new client write event, id/ip/port:", interface, ip, port ) + if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event + --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) + interface.eventwrite = false + return -1 + end + if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect + interface.fatalerror = "timeout during writing" + debug( "writing failed:", interface.fatalerror ) + interface:_close() + interface.eventwrite = false + return -1 + else -- can write :) + if interface._usingssl then -- handle luasec + if interface.eventreadtimeout then -- we have to read first + local ret = interface.readcallback( ) -- call readcallback + --vdebug( "tried to read in writecallback, result:", ret ) end - interface.writebuffer = { t_concat(interface.writebuffer) } - local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) - --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) - if succ then -- writing succesful - interface.writebuffer[1] = nil - interface.writebufferlen = 0 - interface:ondrain(); - if interface.fatalerror then - debug "closing client after writing" - interface:_close() -- close interface if needed - elseif interface.startsslcallback then -- start ssl connection if needed - debug "starting ssl handshake after writing" - interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) - elseif interface.writebufferlen ~= 0 then - -- data possibly written from ondrain - return EV_WRITE, cfg.WRITE_TIMEOUT - elseif interface.eventreadtimeout then - return EV_WRITE, cfg.WRITE_TIMEOUT - end - interface.eventwrite = nil - return -1 - elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again - --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer - interface.writebufferlen = interface.writebufferlen - byte - if "wantread" == err then -- happens only with luasec - local callback = function( ) - interface:_close() - interface.eventwritetimeout = nil - return -1; - end - interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event - debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) - -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent - return -1 - end - return EV_WRITE, cfg.WRITE_TIMEOUT - else -- connection was closed during writing or fatal error - interface.fatalerror = err or "fatal error" - debug( "connection failed in write event:", interface.fatalerror ) - interface:_close() - interface.eventwrite = nil - return -1 + if interface.eventwritetimeout then -- luasec only + interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error + interface.eventwritetimeout = false end end - end - - interface.readcallback = function( event ) -- called on read events - --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) - if interface.noreading or interface.fatalerror then -- leave this event - --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) - interface.eventread = nil - return -1 - end - if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect - interface.fatalerror = "timeout during receiving" - debug( "connection failed:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - else -- can read - if interface._usingssl then -- handle luasec - if interface.eventwritetimeout then -- ok, in the past writecallback was regged - local ret = interface.writecallback( ) -- call it - --vdebug( "tried to write in readcallback, result:", tostring(ret) ) - end - if interface.eventreadtimeout then - interface.eventreadtimeout:close( ) - interface.eventreadtimeout = nil - end - end - local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" - --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part - if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length - interface.fatalerror = "receive buffer exceeded" - debug( "fatal error:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 + interface.writebuffer = { t_concat(interface.writebuffer) } + local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) + --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) + if succ then -- writing succesful + interface.writebuffer[1] = nil + interface.writebufferlen = 0 + interface:ondrain(); + if interface.fatalerror then + debug "closing client after writing" + interface:_close() -- close interface if needed + elseif interface.startsslcallback then -- start ssl connection if needed + debug "starting ssl handshake after writing" + interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) + elseif interface.writebufferlen ~= 0 then + -- data possibly written from ondrain + return EV_WRITE, cfg.WRITE_TIMEOUT + elseif interface.eventreadtimeout then + return EV_WRITE, cfg.WRITE_TIMEOUT end - if err and ( err ~= "timeout" and err ~= "wantread" ) then - if "wantwrite" == err then -- need to read on write event - if not interface.eventwrite then -- register new write event if needed - interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) - end - interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, - function( ) - interface:_close() - end, cfg.READ_TIMEOUT - ) - debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) - -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... - else -- connection was closed or fatal error - interface.fatalerror = err - debug( "connection failed in read event:", interface.fatalerror ) + interface.eventwrite = nil + return -1 + elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again + --vdebug( "writebuffer is not empty:", err ) + interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer + interface.writebufferlen = interface.writebufferlen - byte + if "wantread" == err then -- happens only with luasec + local callback = function( ) interface:_close() - interface.eventread = nil - return -1 + interface.eventwritetimeout = nil + return -1; end - else - interface.onincoming( interface, buffer, err ) -- send new data to listener - end - if interface.noreading then - interface.eventread = nil; - return -1; + interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event + debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) + -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent + return -1 end - return EV_READ, cfg.READ_TIMEOUT + return EV_WRITE, cfg.WRITE_TIMEOUT + else -- connection was closed during writing or fatal error + interface.fatalerror = err or "fatal error" + debug( "connection failed in write event:", interface.fatalerror ) + interface:_close() + interface.eventwrite = nil + return -1 end end + end - client:settimeout( 0 ) -- set non blocking - setmetatable(interface, interface_mt) - interfacelist( "add", interface ) -- add to interfacelist - return interface - end -end - -local handleserver -do - function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface - debug "creating server interface..." - local interface = { - _connections = 0; - - conn = server; - onconnect = listener.onconnect; -- will be called when new client connected - eventread = false; -- read event handler - eventclose = false; -- close event handler - readcallback = false; -- read event callback - fatalerror = false; -- error message - nointerface = true; -- lock/unlock parameter - - _ip = addr, _port = port, _pattern = pattern, - _sslctx = sslctx; - } - interface.id = tostring(interface):match("%x+$"); - interface.readcallback = function( event ) -- server handler, called on incoming connections - --vdebug( "server can accept, id/addr/port:", interface, addr, port ) - if interface.fatalerror then - --vdebug( "leaving this event because:", self.fatalerror ) - interface.eventread = nil - return -1 + interface.readcallback = function( event ) -- called on read events + --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) + if interface.noreading or interface.fatalerror then -- leave this event + --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) + interface.eventread = nil + return -1 + end + if EV_TIMEOUT == event and not interface.conn:dirty() and interface:onreadtimeout() ~= true then + interface.fatalerror = "timeout during receiving" + debug( "connection failed:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 -- took too long to get some data from client -> disconnect + end + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", tostring(ret) ) end - local delay = cfg.ACCEPT_DELAY - if EV_TIMEOUT == event then - if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count - debug( "to many connections, seconds to wait for next accept:", delay ) - return EV_TIMEOUT, delay -- timeout... - else - return EV_READ -- accept again - end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil end - --vdebug("max connection check ok, accepting...") - local client, err = server:accept() -- try to accept; TODO: check err - while client do - if interface._connections >= cfg.MAX_CONNECTIONS then - client:close( ) -- refuse connection - debug( "maximal connections reached, refuse client connection; accept delay:", delay ) - return EV_TIMEOUT, delay -- delay for next accept attempt - end - local client_ip, client_port = client:getpeername( ) - interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) - --vdebug( "client id:", clientinterface, "startssl:", startssl ) - if ssl and sslctx then - clientinterface:starttls(sslctx, true) - else - clientinterface:_start_session( true ) + end + local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) end - debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); - - client, err = server:accept() -- try to accept again + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) interface:_close() end, cfg.READ_TIMEOUT) + debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 end - return EV_READ + else + interface.onincoming( interface, buffer, err ) -- send new data to listener end - - server:settimeout( 0 ) - setmetatable(interface, interface_mt) - interfacelist( "add", interface ) - interface:_start_session() - return interface - end -end - -local addserver = ( function( ) - return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments - --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") - local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket - if not server then - debug( "creating server socket on "..addr.." port "..port.." failed:", err ) - return nil, err + if interface.noreading then + interface.eventread = nil; + return -1; end - local sslctx - if sslcfg then - if not ssl then - debug "fatal error: luasec not found" - return nil, "luasec not found" - end - sslctx, err = sslcfg - if err then - debug( "error while creating new ssl context for server socket:", err ) - return nil, err - end + if interface.conn:dirty() then -- still data left in buffer + return EV_TIMEOUT, cfg.READ_RETRY_DELAY; end - local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler - debug( "new server created with id:", tostring(interface)) - return interface + return EV_READ, cfg.READ_TIMEOUT end -end )( ) -local addclient, wrapclient -do - function wrapclient( client, ip, port, listeners, pattern, sslctx ) - local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_connection(sslctx) - return interface, client - --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface - end - - function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) - local client, err = socket.tcp() -- creating new socket - if not client then - debug( "cannot create socket:", err ) - return nil, err + client:settimeout( 0 ) -- set non blocking + setmetatable(interface, interface_mt) + interfacelist[ interface ] = true -- add to interfacelist + return interface +end + +local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface + debug "creating server interface..." + local interface = { + _connections = 0; + + type = "server"; + conn = server; + onconnect = listener.onconnect; -- will be called when new client connected + eventread = false; -- read event handler + eventclose = false; -- close event handler + readcallback = false; -- read event callback + fatalerror = false; -- error message + nointerface = true; -- lock/unlock parameter + + _ip = addr, _port = port, _pattern = pattern, + _sslctx = sslctx; + } + interface.id = tostring(interface):match("%x+$"); + interface.readcallback = function( event ) -- server handler, called on incoming connections + --vdebug( "server can accept, id/addr/port:", interface, addr, port ) + if interface.fatalerror then + --vdebug( "leaving this event because:", self.fatalerror ) + interface.eventread = nil + return -1 end - client:settimeout( 0 ) -- set nonblocking - if localaddr then - local res, err = client:bind( localaddr, localport, -1 ) - if not res then - debug( "cannot bind client:", err ) - return nil, err + local delay = cfg.ACCEPT_DELAY + if EV_TIMEOUT == event then + if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count + debug( "to many connections, seconds to wait for next accept:", delay ) + return EV_TIMEOUT, delay -- timeout... + else + return EV_READ -- accept again end end - local sslctx - if sslcfg then -- handle ssl/new context - if not ssl then - debug "need luasec, but not available" - return nil, "luasec not found" + --vdebug("max connection check ok, accepting...") + local client, err = server:accept() -- try to accept; TODO: check err + while client do + if interface._connections >= cfg.MAX_CONNECTIONS then + client:close( ) -- refuse connection + debug( "maximal connections reached, refuse client connection; accept delay:", delay ) + return EV_TIMEOUT, delay -- delay for next accept attempt end - sslctx, err = sslcfg - if err then - debug( "cannot create new ssl context:", err ) - return nil, err + local client_ip, client_port = client:getpeername( ) + interface._connections = interface._connections + 1 -- increase connection count + local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) + --vdebug( "client id:", clientinterface, "startssl:", startssl ) + if has_luasec and sslctx then + clientinterface:starttls(sslctx, true) + else + clientinterface:_start_session( true ) end + debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); + + client, err = server:accept() -- try to accept again end - local res, err = client:connect( addr, serverport ) -- connect - if res or ( err == "timeout" ) then - local ip, port = client:getsockname( ) - local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl ) - interface:_start_connection( startssl ) - debug( "new connection id:", interface.id ) - return interface, err + return EV_READ + end + + server:settimeout( 0 ) + setmetatable(interface, interface_mt) + interfacelist[ interface ] = true + interface:_start_session() + return interface +end + +local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments + --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") + if sslctx and not has_luasec then + debug "fatal error: luasec not found" + return nil, "luasec not found" + end + local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket + if not server then + debug( "creating server socket on "..addr.." port "..port.." failed:", err ) + return nil, err + end + local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler + debug( "new server created with id:", tostring(interface)) + return interface +end + +local function wrapclient( client, ip, port, listeners, pattern, sslctx ) + local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) + interface:_start_connection(sslctx) + return interface, client + --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface +end + +local function addclient( addr, serverport, listener, pattern, sslctx, typ ) + if sslctx and not has_luasec then + debug "need luasec, but not available" + return nil, "luasec not found" + end + if not typ then + local addrinfo, err = getaddrinfo(addr) + if not addrinfo then return nil, err end + if addrinfo[1] and addrinfo[1].family == "inet6" then + typ = "tcp6" else - debug( "new connection failed:", err ) - return nil, err + typ = "tcp" end end + local create = socket[typ] + if type( create ) ~= "function" then + return nil, "invalid socket type" + end + local client, err = create() -- creating new socket + if not client then + debug( "cannot create socket:", err ) + return nil, err + end + client:settimeout( 0 ) -- set nonblocking + local res, err = client:connect( addr, serverport ) -- connect + if res or ( err == "timeout" ) then + local ip, port = client:getsockname( ) + local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) + debug( "new connection id:", interface.id ) + return interface, err + else + debug( "new connection failed:", err ) + return nil, err + end end - -local loop = function( ) -- starts the event loop +local function loop( ) -- starts the event loop base:loop( ) return "quitting"; end -local newevent = ( function( ) - local add = base.addevent - return function( ... ) - return add( base, ... ) - end -end )( ) +local function newevent( ... ) + return addevent( base, ... ) +end -local closeallservers = function( arg ) - for _, item in ipairs( interfacelist( ) ) do +local function closeallservers ( arg ) + for item in pairs( interfacelist ) do if item.type == "server" then item:close( arg ) end @@ -815,9 +765,9 @@ end local function setquitting(yes) if yes then - -- Quit now - closeallservers(); - base:loopexit(); + -- Quit now + closeallservers(); + base:loopexit(); end end @@ -829,7 +779,7 @@ end -- being garbage-collected local signal_events = {}; -- [signal_num] -> event object local function hook_signal(signal_num, handler) - local function _handler(event) + local function _handler() local ret = handler(); if ret ~= false then -- Continue handling this signal? return EV_SIGNAL; -- Yes @@ -842,14 +792,14 @@ end local function link(sender, receiver, buffersize) local sender_locked; - + function receiver:ondrain() if sender_locked then sender:resume(); sender_locked = nil; end end - + function sender:onincoming(data) receiver:write(data); if receiver.writebufferlen >= buffersize then @@ -861,12 +811,11 @@ local function link(sender, receiver, buffersize) end return { - cfg = cfg, base = base, loop = loop, link = link, - event = event, + event = levent, event_base = base, addevent = newevent, addserver = addserver, diff --git a/net/server_select.lua b/net/server_select.lua index 39640a83..12aef9d8 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -1,7 +1,7 @@ --- +-- -- server.lua by blastbeat of the luadch project -- Re-used here under the MIT/X Consortium License --- +-- -- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain -- @@ -31,14 +31,12 @@ local tostring = use "tostring" --// lua libs //-- -local os = use "os" local table = use "table" local string = use "string" local coroutine = use "coroutine" --// lua lib methods //-- -local os_difftime = os.difftime local math_min = math.min local math_huge = math.huge local table_concat = table.concat @@ -48,13 +46,14 @@ local coroutine_yield = coroutine.yield --// extern libs //-- -local luasec = use "ssl" +local has_luasec, luasec = pcall ( require , "ssl" ) local luasocket = use "socket" or require "socket" local luasocket_gettime = luasocket.gettime +local getaddrinfo = luasocket.dns.getaddrinfo --// extern lib methods //-- -local ssl_wrap = ( luasec and luasec.wrap ) +local ssl_wrap = ( has_luasec and luasec.wrap ) local socket_bind = luasocket.bind local socket_sleep = luasocket.sleep local socket_select = luasocket.select @@ -149,7 +148,7 @@ _accepretry = 10 -- seconds to wait until the next attempt of a full server to a _maxsendlen = 51000 * 1024 -- max len of send buffer _maxreadlen = 25000 * 1024 -- max len of read buffer -_checkinterval = 1200000 -- interval in secs to check idle clients +_checkinterval = 30 -- interval in secs to check idle clients _sendtimeout = 60000 -- allowed send idle time in secs _readtimeout = 6 * 60 * 60 -- allowed read idle time in secs @@ -295,6 +294,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local status = listeners.onstatus local disconnect = listeners.ondisconnect local drain = listeners.ondrain + local onreadtimeout = listeners.onreadtimeout; local detach = listeners.ondetach local bufferqueue = { } -- buffer array @@ -324,6 +324,8 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.disconnect = function( ) return disconnect end + handler.onreadtimeout = onreadtimeout; + handler.setlistener = function( self, listeners ) if detach then detach(self) -- Notify listener that it is no longer responsible for this connection @@ -332,6 +334,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport disconnect = listeners.ondisconnect status = listeners.onstatus drain = listeners.ondrain + handler.onreadtimeout = listeners.onreadtimeout detach = listeners.ondetach end handler.getstats = function( ) @@ -404,6 +407,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport out_put "server.lua: closed client handler and removed socket from list" return true end + handler.server = function ( ) + return server + end handler.ip = function( ) return ip end @@ -564,7 +570,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local read, wrote handshake = coroutine_wrap( function( client ) -- create handshake coroutine local err - for i = 1, _maxsslhandshake do + for _ = 1, _maxsslhandshake do _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen read, wrote = nil, nil @@ -576,6 +582,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _ = status and status( handler, "ssl-handshake-complete" ) if self.autostart_ssl and listeners.onconnect then listeners.onconnect(self); + if bufferqueuelen ~= 0 then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + end end _readlistlen = addsocket(_readlist, client, _readlistlen) return true @@ -593,13 +602,14 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport coroutine_yield( ) -- handshake not finished end end - out_put( "server.lua: ssl handshake error: ", tostring(err or "handshake too long") ) - _ = handler and handler:force_close("ssl handshake failed") + err = "ssl handshake error: " .. ( err or "handshake too long" ); + out_put( "server.lua: ", err ); + _ = handler and handler:force_close(err) return false, err -- handshake failed end ) end - if luasec then + if has_luasec then handler.starttls = function( self, _sslctx) if _sslctx then handler:set_sslctx(_sslctx); @@ -625,7 +635,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport shutdown = id _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) - + -- remove traces of the old socket _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) @@ -652,7 +662,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) - if sslctx and luasec then + if sslctx and has_luasec then out_put "server.lua: auto-starting ssl negotiation..." handler.autostart_ssl = true; local ok, err = handler:starttls(sslctx); @@ -713,7 +723,7 @@ local function link(sender, receiver, buffersize) sender_locked = nil; end end - + local _readbuffer = sender.readbuffer; function sender.readbuffer() _readbuffer(); @@ -728,22 +738,23 @@ end ----------------------------------// PUBLIC //-- addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server + addr = addr or "*" local err if type( listeners ) ~= "table" then err = "invalid listener table" - end - if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then + elseif type ( addr ) ~= "string" then + err = "invalid address" + elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then err = "invalid port" elseif _server[ addr..":"..port ] then err = "listeners on '[" .. addr .. "]:" .. port .. "' already exist" - elseif sslctx and not luasec then + elseif sslctx and not has_luasec then err = "luasec not found" end if err then out_error( "server.lua, [", addr, "]:", port, ": ", err ) return nil, err end - addr = addr or "*" local server, err = socket_bind( addr, port, _tcpbacklog ) if err then out_error( "server.lua, [", addr, "]:", port, ": ", err ) @@ -853,7 +864,7 @@ loop = function(once) -- this is the main loop of the program local next_timer_time = math_huge; repeat local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) ) - for i, socket in ipairs( write ) do -- send data waiting in writequeues + for _, socket in ipairs( write ) do -- send data waiting in writequeues local handler = _socketlist[ socket ] if handler then handler.sendbuffer( ) @@ -862,7 +873,7 @@ loop = function(once) -- this is the main loop of the program out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen end end - for i, socket in ipairs( read ) do -- receive data + for _, socket in ipairs( read ) do -- receive data local handler = _socketlist[ socket ] if handler then handler.readbuffer( ) @@ -879,21 +890,22 @@ loop = function(once) -- this is the main loop of the program _currenttime = luasocket_gettime( ) -- Check for socket timeouts - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then + if _currenttime - _starttime > _checkinterval then _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then - --_writetimes[ handler ] = nil + if _currenttime - timestamp > _sendtimeout then handler.disconnect( )( handler, "send timeout" ) handler:force_close() -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then - --_readtimes[ handler ] = nil - handler.disconnect( )( handler, "read timeout" ) - handler:close( ) -- forced disconnect? + if _currenttime - timestamp > _readtimeout then + if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then + handler.disconnect( )( handler, "read timeout" ) + handler:close( ) -- forced disconnect? + else + _readtimes[ handler ] = _currenttime -- reset timer + end end end end @@ -921,6 +933,7 @@ loop = function(once) -- this is the main loop of the program socket_sleep( _sleeptime ) until quitting; if once and quitting == "once" then quitting = nil; return; end + closeall(); return "quitting" end @@ -953,17 +966,46 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx return handler, socket end -local addclient = function( address, port, listeners, pattern, sslctx ) - local client, err = luasocket.tcp( ) +local addclient = function( address, port, listeners, pattern, sslctx, typ ) + local err + if type( listeners ) ~= "table" then + err = "invalid listener table" + elseif type ( address ) ~= "string" then + err = "invalid address" + elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then + err = "invalid port" + elseif sslctx and not has_luasec then + err = "luasec not found" + end + if not typ then + local addrinfo, err = getaddrinfo(address) + if not addrinfo then return nil, err end + if addrinfo[1] and addrinfo[1].family == "inet6" then + typ = "tcp6" + else + typ = "tcp" + end + end + local create = luasocket[typ] + if type( create ) ~= "function" then + err = "invalid socket type" + end + + if err then + out_error( "server.lua, addclient: ", err ) + return nil, err + end + + local client, err = create( ) if err then return nil, err end client:settimeout( 0 ) - _, err = client:connect( address, port ) - if err then -- try again - local handler = wrapclient( client, address, port, listeners ) + local ok, err = client:connect( address, port ) + if ok or err == "timeout" then + return wrapclient( client, address, port, listeners, pattern, sslctx ) else - wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx ) + return nil, err end end @@ -993,7 +1035,7 @@ return { addclient = addclient, wrapclient = wrapclient, - + loop = loop, link = link, step = step, diff --git a/net/websocket.lua b/net/websocket.lua new file mode 100644 index 00000000..a4274eec --- /dev/null +++ b/net/websocket.lua @@ -0,0 +1,272 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local t_concat = table.concat; + +local http = require "net.http"; +local frames = require "net.websocket.frames"; +local base64 = require "util.encodings".base64; +local sha1 = require "util.hashes".sha1; +local random_bytes = require "util.random".bytes; +local timer = require "util.timer"; +local log = require "util.logger".init "websocket"; + +local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection. + +local websockets = {}; + +local websocket_listeners = {}; +function websocket_listeners.ondisconnect(handler, err) + local s = websockets[handler]; + websockets[handler] = nil; + if s.close_timer then + timer.stop(s.close_timer); + s.close_timer = nil; + end + s.readyState = 3; + if s.close_code == nil and s.onerror then s:onerror(err); end + if s.onclose then s:onclose(s.close_code, s.close_message or err); end +end + +function websocket_listeners.ondetach(handler) + websockets[handler] = nil; +end + +local function fail(s, code, reason) + module:log("warn", "WebSocket connection failed, closing. %d %s", code, reason); + s:close(code, reason); + s.handler:close(); + return false +end + +function websocket_listeners.onincoming(handler, buffer, err) + local s = websockets[handler]; + s.readbuffer = s.readbuffer..buffer; + while true do + local frame, len = frames.parse(s.readbuffer); + if frame == nil then break end + s.readbuffer = s.readbuffer:sub(len+1); + + log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + + -- Error cases + if frame.RSV1 or frame.RSV2 or frame.RSV3 then -- Reserved bits non zero + return fail(s, 1002, "Reserved bits not zero"); + end + + if frame.opcode < 0x8 then + local databuffer = s.databuffer; + if frame.opcode == 0x0 then -- Continuation frames + if not databuffer then + return fail(s, 1002, "Unexpected continuation frame"); + end + databuffer[#databuffer+1] = frame.data; + elseif frame.opcode == 0x1 or frame.opcode == 0x2 then -- Text or Binary frame + if databuffer then + return fail(s, 1002, "Continuation frame expected"); + end + databuffer = {type=frame.opcode, frame.data}; + s.databuffer = databuffer; + else + return fail(s, 1002, "Reserved opcode"); + end + if frame.FIN then + s.databuffer = nil; + if s.onmessage then + s:onmessage(t_concat(databuffer), databuffer.type); + end + end + else -- Control frame + if frame.length > 125 then -- Control frame with too much payload + return fail(s, 1002, "Payload too large"); + elseif not frame.FIN then -- Fragmented control frame + return fail(s, 1002, "Fragmented control frame"); + end + if frame.opcode == 0x8 then -- Close request + if frame.length == 1 then + return fail(s, 1002, "Close frame with payload, but too short for status code"); + end + local status_code, message = frames.parse_close(frame.data); + if status_code == nil then + --[[ RFC 6455 7.4.1 + 1005 is a reserved value and MUST NOT be set as a status code in a + Close control frame by an endpoint. It is designated for use in + applications expecting a status code to indicate that no status + code was actually present. + ]] + status_code = 1005 + elseif status_code < 1000 then + return fail(s, 1002, "Closed with invalid status code"); + elseif ((status_code > 1003 and status_code < 1007) or status_code > 1011) and status_code < 3000 then + return fail(s, 1002, "Closed with reserved status code"); + end + s.close_code, s.close_message = status_code, message; + s:close(1000); + return true; + elseif frame.opcode == 0x9 then -- Ping frame + frame.opcode = 0xA; + frame.MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + handler:write(frames.build(frame)); + elseif frame.opcode == 0xA then -- Pong frame + log("debug", "Received unexpected pong frame: " .. tostring(frame.data)); + else + return fail(s, 1002, "Reserved opcode"); + end + end + end + return true; +end + +local websocket_methods = {}; +local function close_timeout_cb(now, timerid, s) + s.close_timer = nil; + log("warn", "Close timeout waiting for server to close, closing manually."); + s.handler:close(); +end +function websocket_methods:close(code, reason) + if self.readyState < 2 then + code = code or 1000; + log("debug", "closing WebSocket with code %i: %s" , code , tostring(reason)); + self.readyState = 2; + local handler = self.handler; + handler:write(frames.build_close(code, reason, true)); + -- Do not close socket straight away, wait for acknowledgement from server. + self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self); + elseif self.readyState == 2 then + log("debug", "tried to close a closing WebSocket, closing the raw socket."); + -- Stop timer + if self.close_timer then + timer.stop(self.close_timer); + self.close_timer = nil; + end + local handler = self.handler; + handler:close(); + else + log("debug", "tried to close a closed WebSocket, ignoring."); + end +end +function websocket_methods:send(data, opcode) + if self.readyState < 1 then + return nil, "WebSocket not open yet, unable to send data."; + elseif self.readyState >= 2 then + return nil, "WebSocket closed, unable to send data."; + end + if opcode == "text" or opcode == nil then + opcode = 0x1; + elseif opcode == "binary" then + opcode = 0x2; + end + local frame = { + FIN = true; + MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + opcode = opcode; + data = tostring(data); + }; + log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + return self.handler:write(frames.build(frame)); +end + +local websocket_metatable = { + __index = websocket_methods; +}; + +local function connect(url, ex, listeners) + ex = ex or {}; + + --[[RFC 6455 4.1.7: + The request MUST include a header field with the name + |Sec-WebSocket-Key|. The value of this header field MUST be a + nonce consisting of a randomly selected 16-byte value that has + been base64-encoded (see Section 4 of [RFC4648]). The nonce + MUST be selected randomly for each connection. + ]] + local key = base64.encode(random_bytes(16)); + + -- Either a single protocol string or an array of protocol strings. + local protocol = ex.protocol; + if type(protocol) == "string" then + protocol = { protocol, [protocol] = true }; + elseif type(protocol) == "table" and protocol[1] then + for _, v in ipairs(protocol) do + protocol[v] = true; + end + else + protocol = nil; + end + + local headers = { + ["Upgrade"] = "websocket"; + ["Connection"] = "Upgrade"; + ["Sec-WebSocket-Key"] = key; + ["Sec-WebSocket-Protocol"] = protocol and t_concat(protocol, ", "); + ["Sec-WebSocket-Version"] = "13"; + ["Sec-WebSocket-Extensions"] = ex.extensions; + } + if ex.headers then + for k,v in pairs(ex.headers) do + headers[k] = v; + end + end + + local s = setmetatable({ + readbuffer = ""; + databuffer = nil; + handler = nil; + close_code = nil; + close_message = nil; + close_timer = nil; + readyState = 0; + protocol = nil; + + url = url; + + onopen = listeners.onopen; + onclose = listeners.onclose; + onmessage = listeners.onmessage; + onerror = listeners.onerror; + }, websocket_metatable); + + local http_url = url:gsub("^(ws)", "http"); + local http_req = http.request(http_url, { + method = "GET"; + headers = headers; + sslctx = ex.sslctx; + }, function(b, c, r, http_req) + if c ~= 101 + or r.headers["connection"]:lower() ~= "upgrade" + or r.headers["upgrade"] ~= "websocket" + or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) + or (protocol and not protocol[r.headers["sec-websocket-protocol"]]) + then + s.readyState = 3; + log("warn", "WebSocket connection to %s failed: %s", url, tostring(b)); + if s.onerror then s:onerror("connecting-failed"); end + return; + end + + s.protocol = r.headers["sec-websocket-protocol"]; + + -- Take possession of socket from http + http_req.conn = nil; + local handler = http_req.handler; + s.handler = handler; + websockets[handler] = s; + handler:setlistener(websocket_listeners); + + log("debug", "WebSocket connected successfully to %s", url); + s.readyState = 1; + if s.onopen then s:onopen(); end + websocket_listeners.onincoming(handler, b); + end); + + return s; +end + +return { + connect = connect; +}; diff --git a/net/websocket/frames.lua b/net/websocket/frames.lua new file mode 100644 index 00000000..5fe96d45 --- /dev/null +++ b/net/websocket/frames.lua @@ -0,0 +1,219 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local softreq = require "util.dependencies".softreq; +local random_bytes = require "util.random".bytes; + +local bit = assert(softreq"bit" or softreq"bit32", + "No bit module found. See https://prosody.im/doc/depends#bitop"); +local band = bit.band; +local bor = bit.bor; +local bxor = bit.bxor; +local lshift = bit.lshift; +local rshift = bit.rshift; + +local t_concat = table.concat; +local s_byte = string.byte; +local s_char= string.char; +local s_sub = string.sub; +local s_pack = string.pack; +local s_unpack = string.unpack; + +if not s_pack and softreq"struct" then + s_pack = softreq"struct".pack; + s_unpack = softreq"struct".unpack; +end + +local function read_uint16be(str, pos) + local l1, l2 = s_byte(str, pos, pos+1); + return l1*256 + l2; +end +-- FIXME: this may lose precision +local function read_uint64be(str, pos) + local l1, l2, l3, l4, l5, l6, l7, l8 = s_byte(str, pos, pos+7); + local h = lshift(l1, 24) + lshift(l2, 16) + lshift(l3, 8) + l4; + local l = lshift(l5, 24) + lshift(l6, 16) + lshift(l7, 8) + l8; + return h * 2^32 + l; +end +local function pack_uint16be(x) + return s_char(rshift(x, 8), band(x, 0xFF)); +end +local function get_byte(x, n) + return band(rshift(x, n), 0xFF); +end +local function pack_uint64be(x) + local h = band(x / 2^32, 2^32-1); + return s_char(get_byte(h, 24), get_byte(h, 16), get_byte(h, 8), band(h, 0xFF), + get_byte(x, 24), get_byte(x, 16), get_byte(x, 8), band(x, 0xFF)); +end + +if s_pack then + function pack_uint16be(x) + return s_pack(">I2", x); + end + function pack_uint64be(x) + return s_pack(">I8", x); + end +end + +if s_unpack then + function read_uint16be(str, pos) + return s_unpack(">I2", str, pos); + end + function read_uint64be(str, pos) + return s_unpack(">I8", str, pos); + end +end + +local function parse_frame_header(frame) + if #frame < 2 then return; end + + local byte1, byte2 = s_byte(frame, 1, 2); + local result = { + FIN = band(byte1, 0x80) > 0; + RSV1 = band(byte1, 0x40) > 0; + RSV2 = band(byte1, 0x20) > 0; + RSV3 = band(byte1, 0x10) > 0; + opcode = band(byte1, 0x0F); + + MASK = band(byte2, 0x80) > 0; + length = band(byte2, 0x7F); + }; + + local length_bytes = 0; + if result.length == 126 then + length_bytes = 2; + elseif result.length == 127 then + length_bytes = 8; + end + + local header_length = 2 + length_bytes + (result.MASK and 4 or 0); + if #frame < header_length then return; end + + if length_bytes == 2 then + result.length = read_uint16be(frame, 3); + elseif length_bytes == 8 then + result.length = read_uint64be(frame, 3); + end + + if result.MASK then + result.key = { s_byte(frame, length_bytes+3, length_bytes+6) }; + end + + return result, header_length; +end + +-- XORs the string `str` with the array of bytes `key` +-- TODO: optimize +local function apply_mask(str, key, from, to) + from = from or 1 + if from < 0 then from = #str + from + 1 end -- negative indicies + to = to or #str + if to < 0 then to = #str + to + 1 end -- negative indicies + local key_len = #key + local counter = 0; + local data = {}; + for i = from, to do + local key_index = counter%key_len + 1; + counter = counter + 1; + data[counter] = s_char(bxor(key[key_index], s_byte(str, i))); + end + return t_concat(data); +end + +local function parse_frame_body(frame, header, pos) + if header.MASK then + return apply_mask(frame, header.key, pos, pos + header.length - 1); + else + return frame:sub(pos, pos + header.length - 1); + end +end + +local function parse_frame(frame) + local result, pos = parse_frame_header(frame); + if result == nil or #frame < (pos + result.length) then return; end + result.data = parse_frame_body(frame, result, pos+1); + return result, pos + result.length; +end + +local function build_frame(desc) + local data = desc.data or ""; + + assert(desc.opcode and desc.opcode >= 0 and desc.opcode <= 0xF, "Invalid WebSocket opcode"); + if desc.opcode >= 0x8 then + -- RFC 6455 5.5 + assert(#data <= 125, "WebSocket control frames MUST have a payload length of 125 bytes or less."); + end + + local b1 = bor(desc.opcode, + desc.FIN and 0x80 or 0, + desc.RSV1 and 0x40 or 0, + desc.RSV2 and 0x20 or 0, + desc.RSV3 and 0x10 or 0); + + local b2 = #data; + local length_extra; + if b2 <= 125 then -- 7-bit length + length_extra = ""; + elseif b2 <= 0xFFFF then -- 2-byte length + b2 = 126; + length_extra = pack_uint16be(#data); + else -- 8-byte length + b2 = 127; + length_extra = pack_uint64be(#data); + end + + local key = "" + if desc.MASK then + local key_a = desc.key + if key_a then + key = s_char(unpack(key_a, 1, 4)); + else + key = random_bytes(4); + key_a = {key:byte(1,4)}; + end + b2 = bor(b2, 0x80); + data = apply_mask(data, key_a); + end + + return s_char(b1, b2) .. length_extra .. key .. data +end + +local function parse_close(data) + local code, message + if #data >= 2 then + code = read_uint16be(data, 1); + if #data > 2 then + message = s_sub(data, 3); + end + end + return code, message +end + +local function build_close(code, message, mask) + local data = pack_uint16be(code); + if message then + assert(#message<=123, "Close reason must be <=123 bytes"); + data = data .. message; + end + return build_frame({ + opcode = 0x8; + FIN = true; + MASK = mask; + data = data; + }); +end + +return { + parse_header = parse_frame_header; + parse_body = parse_frame_body; + parse = parse_frame; + build = build_frame; + parse_close = parse_close; + build_close = build_close; +}; |