diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/adns.lua | 20 | ||||
-rw-r--r-- | net/connlisteners.lua | 15 | ||||
-rw-r--r-- | net/dns.lua | 21 | ||||
-rw-r--r-- | net/http.lua | 79 | ||||
-rw-r--r-- | net/http/codes.lua | 12 | ||||
-rw-r--r-- | net/http/server.lua | 40 | ||||
-rw-r--r-- | net/httpserver.lua | 11 | ||||
-rw-r--r-- | net/server.lua | 2 | ||||
-rw-r--r-- | net/server_event.lua | 285 | ||||
-rw-r--r-- | net/server_select.lua | 105 | ||||
-rw-r--r-- | net/websocket.lua | 272 | ||||
-rw-r--r-- | net/websocket/frames.lua | 219 |
12 files changed, 782 insertions, 299 deletions
diff --git a/net/adns.lua b/net/adns.lua index 3fc958f4..d3da2065 100644 --- a/net/adns.lua +++ b/net/adns.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -16,9 +16,9 @@ local coroutine, tostring, pcall = coroutine, tostring, pcall; local function dummy_send(sock, data, i, j) return (j-i)+1; end -module "adns" +local _ENV = nil; -function lookup(handler, qname, qtype, qclass) +local function lookup(handler, qname, qtype, qclass) return coroutine.wrap(function (peek) if peek then log("debug", "Records for %s already cached, using those...", qname); @@ -43,12 +43,12 @@ function lookup(handler, qname, qtype, qclass) end)(dns.peek(qname, qtype, qclass)); end -function cancel(handle, call_handler, reason) +local function cancel(handle, call_handler, reason) log("warn", "Cancelling DNS lookup for %s", tostring(handle[3])); dns.cancel(handle[1], handle[2], handle[3], handle[4], call_handler); end -function new_async_socket(sock, resolver) +local function new_async_socket(sock, resolver) local peername = "<unknown>"; local listener = {}; local handler = {}; @@ -65,7 +65,7 @@ function new_async_socket(sock, resolver) if resolver.socketset[conn] == resolver.best_server and resolver.best_server == #servers then log("error", "Exhausted all %d configured DNS servers, next lookup will try %s again", #servers, servers[1]); end - + resolver:servfail(conn); -- Let the magic commence end end @@ -73,7 +73,7 @@ function new_async_socket(sock, resolver) if not handler then return nil, err; end - + handler.settimeout = function () end handler.setsockname = function (_, ...) return sock:setsockname(...); end handler.setpeername = function (_, ...) peername = (...); local ret, err = sock:setpeername(...); _:set_send(dummy_send); return ret, err; end @@ -88,4 +88,8 @@ end dns.socket_wrapper_set(new_async_socket); -return _M; +return { + lookup = lookup; + cancel = cancel; + new_async_socket = new_async_socket; +}; diff --git a/net/connlisteners.lua b/net/connlisteners.lua index 99ddc720..000bfa63 100644 --- a/net/connlisteners.lua +++ b/net/connlisteners.lua @@ -2,14 +2,17 @@ local log = require "util.logger".init("net.connlisteners"); local traceback = debug.traceback; -module "httpserver" +local _ENV = nil; -function fail() +local function fail() log("error", "Attempt to use legacy connlisteners API. For more info see http://prosody.im/doc/developers/network"); log("error", "Legacy connlisteners API usage, %s", traceback("", 2)); end -register, deregister = fail, fail; -get, start = fail, fail, epic_fail; - -return _M; +return { + register = fail; + register = fail; + get = fail; + start = fail; + -- epic fail +}; diff --git a/net/dns.lua b/net/dns.lua index d123731c..689020a4 100644 --- a/net/dns.lua +++ b/net/dns.lua @@ -22,8 +22,8 @@ local is_windows = (_ and windows) or os.getenv("WINDIR"); local coroutine, io, math, string, table = coroutine, io, math, string, table; -local ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type= - ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack, select, type; +local ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type, unpack= + ipairs, next, pairs, print, setmetatable, tostring, assert, error, select, type, table.unpack or unpack; local ztact = { -- public domain 20080404 lua@ztact.com get = function(parent, ...) @@ -71,8 +71,8 @@ local get, set = ztact.get, ztact.set; local default_timeout = 15; -------------------------------------------------- module dns -module('dns') -local dns = _M; +local _ENV = nil; +local dns = {}; -- dns type & class codes ------------------------------ dns type & class codes @@ -213,15 +213,6 @@ function cache_metatable.__tostring(cache) end -function resolver:new() -- - - - - - - - - - - - - - - - - - - - - resolver - local r = { active = {}, cache = {}, unsorted = {} }; - setmetatable(r, resolver); - setmetatable(r.cache, cache_metatable); - setmetatable(r.unsorted, { __mode = 'kv' }); - return r; -end - - -- packet layer -------------------------------------------------- packet layer @@ -629,7 +620,7 @@ function resolver:getsocket(servernum) -- - - - - - - - - - - - - getsocket if peer:find(":") then sock, err = socket.udp6(); else - sock, err = socket.udp(); + sock, err = (socket.udp4 or socket.udp)(); end if sock and self.socket_wrapper then sock, err = self.socket_wrapper(sock, self); end if not sock then @@ -1054,8 +1045,6 @@ end function dns.resolver () -- - - - - - - - - - - - - - - - - - - - - resolver - -- this function seems to be redundant with resolver.new () - local r = { active = {}, cache = {}, unsorted = {}, wanted = {}, best_server = 1 }; setmetatable (r, resolver); setmetatable (r.cache, cache_metatable); diff --git a/net/http.lua b/net/http.lua index 8ce47494..b78f8438 100644 --- a/net/http.lua +++ b/net/http.lua @@ -1,12 +1,11 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- -local socket = require "socket" local b64 = require "util.encodings".base64.encode; local url = require "socket.url" local httpstream_new = require "net.http.parser".new; @@ -24,7 +23,7 @@ local assert, error = assert, error local log = require "util.logger".init("http"); -module "http" +local _ENV = nil; local requests = {}; -- Open requests @@ -37,7 +36,7 @@ function listener.onconnect(conn) if req.query then t_insert(request_line, 4, "?"..req.query); end - + conn:write(t_concat(request_line)); local t = { [2] = ": ", [4] = "\r\n" }; for k, v in pairs(req.headers) do @@ -45,7 +44,7 @@ function listener.onconnect(conn) conn:write(t_concat(t)); end conn:write("\r\n"); - + if req.body then conn:write(req.body); end @@ -76,6 +75,13 @@ function listener.ondetach(conn) requests[conn] = nil; end +local function destroy_request(request) + if request.conn then + request.conn = nil; + request.handler:close() + end +end + local function request_reader(request, data, err) if not request.parser then local function error_cb(reason) @@ -85,12 +91,12 @@ local function request_reader(request, data, err) end destroy_request(request); end - + if not data then error_cb(err); return; end - + local function success_cb(r) if request.callback then request.callback(r.body, r.code, r, request); @@ -107,20 +113,20 @@ local function request_reader(request, data, err) end local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end -function request(u, ex, callback) +local function request(u, ex, callback) local req = url.parse(u); - + if not (req and req.host) then callback(nil, 0, req); return nil, "invalid-url"; end - + if not req.path then req.path = "/"; end - + local method, headers, body; - + local host, port = req.host, req.port; local host_header = host; if (port == "80" and req.scheme == "http") @@ -134,7 +140,7 @@ function request(u, ex, callback) ["Host"] = host_header; ["User-Agent"] = "Prosody XMPP Server"; }; - + if req.userinfo then headers["Authorization"] = "Basic "..b64(req.userinfo); end @@ -154,33 +160,29 @@ function request(u, ex, callback) end end end - + -- Attach to request object req.method, req.headers, req.body = method, headers, body; - + local using_https = req.scheme == "https"; if using_https and not ssl_available then error("SSL not available, unable to contact https URL"); end local port_number = port and tonumber(port) or (using_https and 443 or 80); - - -- Connect the socket, and wrap it with net.server - local conn = socket.tcp(); - conn:settimeout(10); - local ok, err = conn:connect(host, port_number); - if not ok and err ~= "timeout" then - callback(nil, 0, req); - return nil, err; - end - + local sslctx = false; if using_https then sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2", "no_sslv3" } }; end - req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx)); + local handler, conn = server.addclient(host, port_number, listener, "*a", sslctx) + if not handler then + callback(nil, 0, req); + return nil, conn; + end + req.handler, req.conn = handler, conn req.write = function (...) return req.handler:write(...); end - + req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end req.reader = request_reader; req.state = "status"; @@ -189,17 +191,12 @@ function request(u, ex, callback) return req; end -function destroy_request(request) - if request.conn then - request.conn = nil; - request.handler:close() - end -end - -local urlencode, urldecode = util_http.urlencode, util_http.urldecode; -local formencode, formdecode = util_http.formencode, util_http.formdecode; - -_M.urlencode, _M.urldecode = urlencode, urldecode; -_M.formencode, _M.formdecode = formencode, formdecode; - -return _M; +return { + request = request; + + -- COMPAT + urlencode = util_http.urlencode; + urldecode = util_http.urldecode; + formencode = util_http.formencode; + formdecode = util_http.formdecode; +}; diff --git a/net/http/codes.lua b/net/http/codes.lua index 0cadd079..bc31c7dd 100644 --- a/net/http/codes.lua +++ b/net/http/codes.lua @@ -25,6 +25,7 @@ local response_codes = { [305] = "Use Proxy"; -- The 306 status code was used in a previous version of [RFC2616], is no longer used, and the code is reserved. [307] = "Temporary Redirect"; + [308] = "Permanent Redirect"; [400] = "Bad Request"; [401] = "Unauthorized"; @@ -39,17 +40,21 @@ local response_codes = { [410] = "Gone"; [411] = "Length Required"; [412] = "Precondition Failed"; - [413] = "Request Entity Too Large"; - [414] = "Request-URI Too Long"; + [413] = "Payload Too Large"; + [414] = "URI Too Long"; [415] = "Unsupported Media Type"; - [416] = "Requested Range Not Satisfiable"; + [416] = "Range Not Satisfiable"; [417] = "Expectation Failed"; [418] = "I'm a teapot"; + [421] = "Misdirected Request"; [422] = "Unprocessable Entity"; [423] = "Locked"; [424] = "Failed Dependency"; -- The 425 status code is reserved for the WebDAV advanced collections expired proposal [RFC2817] [426] = "Upgrade Required"; + [428] = "Precondition Required"; + [429] = "Too Many Requests"; + [431] = "Request Header Fields Too Large"; [500] = "Internal Server Error"; [501] = "Not Implemented"; @@ -61,6 +66,7 @@ local response_codes = { [507] = "Insufficient Storage"; [508] = "Loop Detected"; [510] = "Not Extended"; + [511] = "Network Authentication Required"; }; for k,v in pairs(response_codes) do response_codes[k] = k.." "..v; end diff --git a/net/http/server.lua b/net/http/server.lua index f091595c..aeaa7416 100644 --- a/net/http/server.lua +++ b/net/http/server.lua @@ -11,6 +11,7 @@ local setmetatable = setmetatable; local xpcall = xpcall; local traceback = debug.traceback; local tostring = tostring; +local cache = require "util.cache"; local codes = require "net.http.codes"; local _M = {}; @@ -27,7 +28,10 @@ local function is_wildcard_match(wildcard_event, event) return wildcard_event:sub(1, -2) == event:sub(1, #wildcard_event-1); end -local recent_wildcard_events, max_cached_wildcard_events = {}, 10000; +local _handlers = events._handlers; +local recent_wildcard_events = cache.new(10000, function (key, value) + rawset(_handlers, key, nil); +end); local event_map = events._event_map; setmetatable(events._handlers, { @@ -62,10 +66,7 @@ setmetatable(events._handlers, { end rawset(handlers, curr_event, handlers_array); if not event_map[curr_event] then -- Only wildcard handlers match, if any - table.insert(recent_wildcard_events, curr_event); - if #recent_wildcard_events > max_cached_wildcard_events then - rawset(handlers, table.remove(recent_wildcard_events, 1), nil); - end + recent_wildcard_events:set(curr_event, true); end return handlers_array; end; @@ -189,6 +190,7 @@ function handle_request(conn, request, finish_cb) persistent = persistent; conn = conn; send = _M.send_response; + done = _M.finish_response; finish_cb = finish_cb; }; conn._http_open_response = response; @@ -208,7 +210,7 @@ function handle_request(conn, request, finish_cb) err_code, err = 400, "Missing or invalid 'Host' header"; end end - + if err then response.status_code = err_code; response:send(events.fire_event("http-error", { code = err_code, message = err })); @@ -250,24 +252,30 @@ function handle_request(conn, request, finish_cb) response.status_code = 404; response:send(events.fire_event("http-error", { code = 404 })); end -function _M.send_response(response, body) - if response.finished then return; end - response.finished = true; - response.conn._http_open_response = nil; - +local function prepare_header(response) local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]); local headers = response.headers; - body = body or response.body or ""; - headers.content_length = #body; - local output = { status_line }; for k,v in pairs(headers) do t_insert(output, headerfix[k]..v); end t_insert(output, "\r\n\r\n"); + return output; +end +_M.prepare_header = prepare_header; +function _M.send_response(response, body) + if response.finished then return; end + body = body or response.body or ""; + response.headers.content_length = #body; + local output = prepare_header(response); t_insert(output, body); - response.conn:write(t_concat(output)); + response:done(); +end +function _M.finish_response(response) + if response.finished then return; end + response.finished = true; + response.conn._http_open_response = nil; if response.on_destroy then response:on_destroy(); response.on_destroy = nil; @@ -286,7 +294,7 @@ function _M.remove_handler(event, handler) end function _M.listen_on(port, interface, ssl) - addserver(interface or "*", port, listener, "*a", ssl); + return addserver(interface or "*", port, listener, "*a", ssl); end function _M.add_host(host) hosts[host] = true; diff --git a/net/httpserver.lua b/net/httpserver.lua index 7d574788..6e2e31b9 100644 --- a/net/httpserver.lua +++ b/net/httpserver.lua @@ -2,14 +2,15 @@ local log = require "util.logger".init("net.httpserver"); local traceback = debug.traceback; -module "httpserver" +local _ENV = nil; function fail() log("error", "Attempt to use legacy HTTP API. For more info see http://prosody.im/doc/developers/legacy_http"); log("error", "Legacy HTTP API usage, %s", traceback("", 2)); end -new, new_from_config = fail, fail; -set_default_handler = fail; - -return _M; +return { + new = fail; + new_from_config = fail; + set_default_handler = fail; +}; diff --git a/net/server.lua b/net/server.lua index 9b0d27e1..41e180fa 100644 --- a/net/server.lua +++ b/net/server.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- diff --git a/net/server_event.lua b/net/server_event.lua index d505825d..70a6dc37 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -11,6 +11,7 @@ -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing --]] +-- luacheck: ignore 212/self 431/err 211/ret local SCRIPT_NAME = "server_event.lua" local SCRIPT_VERSION = "0.05" @@ -32,27 +33,32 @@ local cfg = { DEBUG = true, -- show debug messages } -local function use(x) return rawget(_G, x); end -local ipairs = use "ipairs" -local string = use "string" -local select = use "select" -local require = use "require" -local tostring = use "tostring" -local coroutine = use "coroutine" -local setmetatable = use "setmetatable" +local pairs = pairs +local select = select +local require = require +local tostring = tostring +local setmetatable = setmetatable local t_insert = table.insert local t_concat = table.concat +local s_sub = string.sub -local ssl = use "ssl" -local socket = use "socket" or require "socket" +local coroutine_wrap = coroutine.wrap +local coroutine_yield = coroutine.yield + +local has_luasec, ssl = pcall ( require , "ssl" ) +local socket = require "socket" +local levent = require "luaevent.core" + +local socket_gettime = socket.gettime +local getaddrinfo = socket.dns.getaddrinfo local log = require ("util.logger").init("socket") local function debug(...) return log("debug", ("%s "):rep(select('#', ...)), ...) end -local vdebug = debug; +-- local vdebug = debug; local bitor = ( function( ) -- thx Rici Lake local hasbit = function( x, p ) @@ -72,62 +78,25 @@ local bitor = ( function( ) -- thx Rici Lake end end )( ) -local event = require "luaevent.core" -local base = event.new( ) -local EV_READ = event.EV_READ -local EV_WRITE = event.EV_WRITE -local EV_TIMEOUT = event.EV_TIMEOUT -local EV_SIGNAL = event.EV_SIGNAL +local base = levent.new( ) +local addevent = base.addevent +local EV_READ = levent.EV_READ +local EV_WRITE = levent.EV_WRITE +local EV_TIMEOUT = levent.EV_TIMEOUT +local EV_SIGNAL = levent.EV_SIGNAL local EV_READWRITE = bitor( EV_READ, EV_WRITE ) -local interfacelist = ( function( ) -- holds the interfaces for sockets - local array = { } - local len = 0 - return function( method, arg ) - if "add" == method then - len = len + 1 - array[ len ] = arg - arg:_position( len ) - return len - elseif "delete" == method then - if len <= 0 then - return nil, "array is already empty" - end - local position = arg:_position() -- get position in array - if position ~= len then - local interface = array[ len ] -- get last interface - array[ position ] = interface -- copy it into free position - array[ len ] = nil -- free last position - interface:_position( position ) -- set new position in array - else -- free last position - array[ len ] = nil - end - len = len - 1 - return len - else - return array - end - end -end )( ) +local interfacelist = { } -- Client interface methods -local interface_mt -do - interface_mt = {}; interface_mt.__index = interface_mt; - - local addevent = base.addevent - local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - +local interface_mt = {}; interface_mt.__index = interface_mt; + -- Private methods - function interface_mt:_position(new_position) - self.position = new_position or self.position - return self.position; - end function interface_mt:_close() return self:_destroy(); end - + function interface_mt:_start_connection(plainssl) -- should be called from addclient local callback = function( event ) if EV_TIMEOUT == event then -- timeout during connection @@ -136,7 +105,7 @@ do self:_close() debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) else - if plainssl and ssl then -- start ssl session + if plainssl and has_luasec then -- start ssl session self:starttls(self._sslctx, true) else -- normal connection self:_start_session(true) @@ -188,8 +157,7 @@ do return false end self.conn:settimeout( 0 ) -- set non blocking - local handshakecallback = coroutine_wrap( - function( event ) + local handshakecallback = coroutine_wrap(function( event ) local _, err local attempt = 0 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS @@ -265,15 +233,15 @@ do self.eventread, self.eventclose = nil, nil self.interface, self.readcallback = nil, nil end - interfacelist( "delete", self ) + interfacelist[ self ] = nil return true end - + function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting return nointerface, noreading, nowriting end - + --TODO: Deprecate function interface_mt:lock_read(switch) if switch then @@ -301,7 +269,7 @@ do end return self._connections end - + -- Public methods function interface_mt:write(data) if self.nowriting then return nil, "locked" end @@ -344,27 +312,27 @@ do return true end end - + function interface_mt:socket() return self.conn end - + function interface_mt:server() return self._server or self; end - + function interface_mt:port() return self._port end - + function interface_mt:serverport() return self._serverport end - + function interface_mt:ip() return self._ip end - + function interface_mt:ssl() return self._usingssl end @@ -373,15 +341,15 @@ do function interface_mt:type() return self._type or "client" end - + function interface_mt:connections() return self._connections end - + function interface_mt:address() return self.addr end - + function interface_mt:set_sslctx(sslctx) self._sslctx = sslctx; if sslctx then @@ -397,11 +365,11 @@ do end return self._pattern; end - - function interface_mt:set_send(new_send) + +function interface_mt:set_send(new_send) -- luacheck: ignore 212 -- No-op, we always use the underlying connection's send end - + function interface_mt:starttls(sslctx, call_onconnect) debug( "try to start ssl at client id:", self.id ) local err @@ -430,22 +398,22 @@ do self.starttls = false; return true end - + function interface_mt:setoption(option, value) if self.conn.setoption then return self.conn:setoption(option, value); end return false, "setoption not implemented"; end - + function interface_mt:setlistener(listener) self:ondetach(); -- Notify listener that it is no longer responsible for this connection - self.onconnect, self.ondisconnect, self.onincoming, - self.ontimeout, self.onstatus, self.ondetach - = listener.onconnect, listener.ondisconnect, listener.onincoming, - listener.ontimeout, listener.onstatus, listener.ondetach; + self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, + self.onreadtimeout, self.onstatus, self.ondetach + = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, + listener.onreadtimeout, listener.onstatus, listener.ondetach; end - + -- Stub handlers function interface_mt:onconnect() end @@ -455,22 +423,22 @@ do end function interface_mt:ontimeout() end +function interface_mt:onreadtimeout() + self.fatalerror = "timeout during receiving" + debug( "connection failed:", self.fatalerror ) + self:_close() + self.eventread = nil +end function interface_mt:ondrain() end function interface_mt:ondetach() end function interface_mt:onstatus() end -end -- End of client interface methods -local handleclient; -do - local string_sub = string.sub -- caching table lookups - local addevent = base.addevent - local socket_gettime = socket.gettime - function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface +local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface --vdebug("creating client interfacce...") local interface = { type = "client"; @@ -484,6 +452,7 @@ do ondisconnect = listener.ondisconnect; -- will be called when client disconnects onincoming = listener.onincoming; -- will be called when client sends data ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs ondrain = listener.ondrain; -- called when writebuffer is empty ondetach = listener.ondetach; -- called when disassociating this listener from this connection onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) @@ -499,14 +468,14 @@ do noreading = false, nowriting = false; -- locks of the read/writecallback startsslcallback = false; -- starting handshake callback position = false; -- position of client in interfacelist - + -- Properties _ip = ip, _port = port, _server = server, _pattern = pattern, _serverport = (server and server:port() or nil), _sslctx = sslctx; -- parameters _usingssl = false; -- client is using ssl; } - if not ssl then interface.starttls = false; end + if not has_luasec then interface.starttls = false; end interface.id = tostring(interface):match("%x+$"); interface.writecallback = function( event ) -- called on write events --vdebug( "new client write event, id/ip/port:", interface, ip, port ) @@ -552,7 +521,7 @@ do return -1 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer + interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer interface.writebufferlen = interface.writebufferlen - byte if "wantread" == err then -- happens only with luasec local callback = function( ) @@ -575,7 +544,7 @@ do end end end - + interface.readcallback = function( event ) -- called on read events --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) if interface.noreading or interface.fatalerror then -- leave this event @@ -583,13 +552,9 @@ do interface.eventread = nil return -1 end - if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect - interface.fatalerror = "timeout during receiving" - debug( "connection failed:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - else -- can read + if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then + return -1 -- took too long to get some data from client -> disconnect + end if interface._usingssl then -- handle luasec if interface.eventwritetimeout then -- ok, in the past writecallback was regged local ret = interface.writecallback( ) -- call it @@ -638,22 +603,19 @@ do end return EV_READ, cfg.READ_TIMEOUT end - end client:settimeout( 0 ) -- set non blocking setmetatable(interface, interface_mt) - interfacelist( "add", interface ) -- add to interfacelist + interfacelist[ interface ] = true -- add to interfacelist return interface end -end -local handleserver -do - function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface +local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface debug "creating server interface..." local interface = { _connections = 0; - + + type = "server"; conn = server; onconnect = listener.onconnect; -- will be called when new client connected eventread = false; -- read event handler @@ -661,7 +623,7 @@ do readcallback = false; -- read event callback fatalerror = false; -- error message nointerface = true; -- lock/unlock parameter - + _ip = addr, _port = port, _pattern = pattern, _sslctx = sslctx; } @@ -694,92 +656,77 @@ do interface._connections = interface._connections + 1 -- increase connection count local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) --vdebug( "client id:", clientinterface, "startssl:", startssl ) - if ssl and sslctx then + if has_luasec and sslctx then clientinterface:starttls(sslctx, true) else clientinterface:_start_session( true ) end debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); - + client, err = server:accept() -- try to accept again end return EV_READ end - + server:settimeout( 0 ) setmetatable(interface, interface_mt) - interfacelist( "add", interface ) + interfacelist[ interface ] = true interface:_start_session() return interface end -end -local addserver = ( function( ) - return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments - --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") +local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments + --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") + if sslctx and not has_luasec then + debug "fatal error: luasec not found" + return nil, "luasec not found" +end local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket if not server then debug( "creating server socket on "..addr.." port "..port.." failed:", err ) return nil, err end - local sslctx - if sslcfg then - if not ssl then - debug "fatal error: luasec not found" - return nil, "luasec not found" - end - sslctx, err = sslcfg - if err then - debug( "error while creating new ssl context for server socket:", err ) - return nil, err - end - end local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler debug( "new server created with id:", tostring(interface)) return interface end -end )( ) -local addclient, wrapclient -do - function wrapclient( client, ip, port, listeners, pattern, sslctx ) +local function wrapclient( client, ip, port, listeners, pattern, sslctx ) local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) interface:_start_connection(sslctx) return interface, client --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface end - - function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) - local client, err = socket.tcp() -- creating new socket - if not client then - debug( "cannot create socket:", err ) - return nil, err + +local function addclient( addr, serverport, listener, pattern, sslctx, typ ) + if sslctx and not has_luasec then + debug "need luasec, but not available" + return nil, "luasec not found" end - client:settimeout( 0 ) -- set nonblocking - if localaddr then - local res, err = client:bind( localaddr, localport, -1 ) - if not res then - debug( "cannot bind client:", err ) - return nil, err + if not typ then + local addrinfo, err = getaddrinfo(addr) + if not addrinfo then return nil, err end + if addrinfo[1] and addrinfo[1].family == "inet6" then + typ = "tcp6" + else + typ = "tcp" end end - local sslctx - if sslcfg then -- handle ssl/new context - if not ssl then - debug "need luasec, but not available" - return nil, "luasec not found" + local create = socket[typ] + if type( create ) ~= "function" then + return nil, "invalid socket type" end - sslctx, err = sslcfg - if err then - debug( "cannot create new ssl context:", err ) + local client, err = create() -- creating new socket + if not client then + debug( "cannot create socket:", err ) return nil, err end - end + client:settimeout( 0 ) -- set nonblocking local res, err = client:connect( addr, serverport ) -- connect if res or ( err == "timeout" ) then local ip, port = client:getsockname( ) - local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl ) - interface:_start_connection( startssl ) + local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) + interface:_start_connection( sslctx ) debug( "new connection id:", interface.id ) return interface, err else @@ -787,23 +734,18 @@ do return nil, err end end -end - -local loop = function( ) -- starts the event loop +local function loop( ) -- starts the event loop base:loop( ) return "quitting"; end -local newevent = ( function( ) - local add = base.addevent - return function( ... ) - return add( base, ... ) +local function newevent( ... ) + return addevent( base, ... ) end -end )( ) -local closeallservers = function( arg ) - for _, item in ipairs( interfacelist( ) ) do +local function closeallservers ( arg ) + for item in pairs( interfacelist ) do if item.type == "server" then item:close( arg ) end @@ -826,7 +768,7 @@ end -- being garbage-collected local signal_events = {}; -- [signal_num] -> event object local function hook_signal(signal_num, handler) - local function _handler(event) + local function _handler() local ret = handler(); if ret ~= false then -- Continue handling this signal? return EV_SIGNAL; -- Yes @@ -839,14 +781,14 @@ end local function link(sender, receiver, buffersize) local sender_locked; - + function receiver:ondrain() if sender_locked then sender:resume(); sender_locked = nil; end end - + function sender:onincoming(data) receiver:write(data); if receiver.writebufferlen >= buffersize then @@ -858,12 +800,11 @@ local function link(sender, receiver, buffersize) end return { - cfg = cfg, base = base, loop = loop, link = link, - event = event, + event = levent, event_base = base, addevent = newevent, addserver = addserver, diff --git a/net/server_select.lua b/net/server_select.lua index c50a6ce1..3503c30d 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -1,7 +1,7 @@ --- +-- -- server.lua by blastbeat of the luadch project -- Re-used here under the MIT/X Consortium License --- +-- -- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain -- @@ -38,7 +38,6 @@ local coroutine = use "coroutine" --// lua lib methods //-- -local os_difftime = os.difftime local math_min = math.min local math_huge = math.huge local table_concat = table.concat @@ -48,13 +47,14 @@ local coroutine_yield = coroutine.yield --// extern libs //-- -local luasec = use "ssl" +local has_luasec, luasec = pcall ( require , "ssl" ) local luasocket = use "socket" or require "socket" local luasocket_gettime = luasocket.gettime +local getaddrinfo = luasocket.dns.getaddrinfo --// extern lib methods //-- -local ssl_wrap = ( luasec and luasec.wrap ) +local ssl_wrap = ( has_luasec and luasec.wrap ) local socket_bind = luasocket.bind local socket_sleep = luasocket.sleep local socket_select = luasocket.select @@ -149,7 +149,7 @@ _accepretry = 10 -- seconds to wait until the next attempt of a full server to a _maxsendlen = 51000 * 1024 -- max len of send buffer _maxreadlen = 25000 * 1024 -- max len of read buffer -_checkinterval = 1200000 -- interval in secs to check idle clients +_checkinterval = 30 -- interval in secs to check idle clients _sendtimeout = 60000 -- allowed send idle time in secs _readtimeout = 6 * 60 * 60 -- allowed read idle time in secs @@ -295,6 +295,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local status = listeners.onstatus local disconnect = listeners.ondisconnect local drain = listeners.ondrain + local onreadtimeout = listeners.onreadtimeout; local detach = listeners.ondetach local bufferqueue = { } -- buffer array @@ -324,6 +325,8 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.disconnect = function( ) return disconnect end + handler.onreadtimeout = onreadtimeout; + handler.setlistener = function( self, listeners ) if detach then detach(self) -- Notify listener that it is no longer responsible for this connection @@ -332,6 +335,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport disconnect = listeners.ondisconnect status = listeners.onstatus drain = listeners.ondrain + handler.onreadtimeout = listeners.onreadtimeout detach = listeners.ondetach end handler.getstats = function( ) @@ -404,6 +408,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport out_put "server.lua: closed client handler and removed socket from list" return true end + handler.server = function ( ) + return server + end handler.ip = function( ) return ip end @@ -575,6 +582,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _ = status and status( handler, "ssl-handshake-complete" ) if self.autostart_ssl and listeners.onconnect then listeners.onconnect(self); + if bufferqueuelen ~= 0 then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + end end _readlistlen = addsocket(_readlist, client, _readlistlen) return true @@ -592,13 +602,14 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport coroutine_yield( ) -- handshake not finished end end - out_put( "server.lua: ssl handshake error: ", tostring(err or "handshake too long") ) - _ = handler and handler:force_close("ssl handshake failed") + err = "ssl handshake error: " .. ( err or "handshake too long" ); + out_put( "server.lua: ", err ); + _ = handler and handler:force_close(err) return false, err -- handshake failed end ) end - if luasec then + if has_luasec then handler.starttls = function( self, _sslctx) if _sslctx then handler:set_sslctx(_sslctx); @@ -624,7 +635,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport shutdown = id _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) - + -- remove traces of the old socket _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) @@ -651,7 +662,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) - if sslctx and luasec then + if sslctx and has_luasec then out_put "server.lua: auto-starting ssl negotiation..." handler.autostart_ssl = true; local ok, err = handler:starttls(sslctx); @@ -712,7 +723,7 @@ local function link(sender, receiver, buffersize) sender_locked = nil; end end - + local _readbuffer = sender.readbuffer; function sender.readbuffer() _readbuffer(); @@ -727,22 +738,23 @@ end ----------------------------------// PUBLIC //-- addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server + addr = addr or "*" local err if type( listeners ) ~= "table" then err = "invalid listener table" - end - if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then + elseif type ( addr ) ~= "string" then + err = "invalid address" + elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then err = "invalid port" elseif _server[ addr..":"..port ] then err = "listeners on '[" .. addr .. "]:" .. port .. "' already exist" - elseif sslctx and not luasec then + elseif sslctx and not has_luasec then err = "luasec not found" end if err then out_error( "server.lua, [", addr, "]:", port, ": ", err ) return nil, err end - addr = addr or "*" local server, err = socket_bind( addr, port, _tcpbacklog ) if err then out_error( "server.lua, [", addr, "]:", port, ": ", err ) @@ -878,21 +890,22 @@ loop = function(once) -- this is the main loop of the program _currenttime = luasocket_gettime( ) -- Check for socket timeouts - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then + if _currenttime - _starttime > _checkinterval then _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then - --_writetimes[ handler ] = nil + if _currenttime - timestamp > _sendtimeout then handler.disconnect( )( handler, "send timeout" ) handler:force_close() -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then - --_readtimes[ handler ] = nil - handler.disconnect( )( handler, "read timeout" ) - handler:close( ) -- forced disconnect? + if _currenttime - timestamp > _readtimeout then + if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then + handler.disconnect( )( handler, "read timeout" ) + handler:close( ) -- forced disconnect? + else + _readtimes[ handler ] = _currenttime -- reset timer + end end end end @@ -920,6 +933,7 @@ loop = function(once) -- this is the main loop of the program socket_sleep( _sleeptime ) until quitting; if once and quitting == "once" then quitting = nil; return; end + closeall(); return "quitting" end @@ -952,17 +966,46 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx return handler, socket end -local addclient = function( address, port, listeners, pattern, sslctx ) - local client, err = luasocket.tcp( ) +local addclient = function( address, port, listeners, pattern, sslctx, typ ) + local err + if type( listeners ) ~= "table" then + err = "invalid listener table" + elseif type ( address ) ~= "string" then + err = "invalid address" + elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then + err = "invalid port" + elseif sslctx and not has_luasec then + err = "luasec not found" + end + if not typ then + local addrinfo, err = getaddrinfo(address) + if not addrinfo then return nil, err end + if addrinfo[1] and addrinfo[1].family == "inet6" then + typ = "tcp6" + else + typ = "tcp" + end + end + local create = luasocket[typ] + if type( create ) ~= "function" then + err = "invalid socket type" + end + + if err then + out_error( "server.lua, addclient: ", err ) + return nil, err + end + + local client, err = create( ) if err then return nil, err end client:settimeout( 0 ) - _, err = client:connect( address, port ) - if err then -- try again - local handler = wrapclient( client, address, port, listeners ) + local ok, err = client:connect( address, port ) + if ok or err == "timeout" then + return wrapclient( client, address, port, listeners, pattern, sslctx ) else - wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx ) + return nil, err end end @@ -992,7 +1035,7 @@ return { addclient = addclient, wrapclient = wrapclient, - + loop = loop, link = link, step = step, diff --git a/net/websocket.lua b/net/websocket.lua new file mode 100644 index 00000000..a4274eec --- /dev/null +++ b/net/websocket.lua @@ -0,0 +1,272 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local t_concat = table.concat; + +local http = require "net.http"; +local frames = require "net.websocket.frames"; +local base64 = require "util.encodings".base64; +local sha1 = require "util.hashes".sha1; +local random_bytes = require "util.random".bytes; +local timer = require "util.timer"; +local log = require "util.logger".init "websocket"; + +local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection. + +local websockets = {}; + +local websocket_listeners = {}; +function websocket_listeners.ondisconnect(handler, err) + local s = websockets[handler]; + websockets[handler] = nil; + if s.close_timer then + timer.stop(s.close_timer); + s.close_timer = nil; + end + s.readyState = 3; + if s.close_code == nil and s.onerror then s:onerror(err); end + if s.onclose then s:onclose(s.close_code, s.close_message or err); end +end + +function websocket_listeners.ondetach(handler) + websockets[handler] = nil; +end + +local function fail(s, code, reason) + module:log("warn", "WebSocket connection failed, closing. %d %s", code, reason); + s:close(code, reason); + s.handler:close(); + return false +end + +function websocket_listeners.onincoming(handler, buffer, err) + local s = websockets[handler]; + s.readbuffer = s.readbuffer..buffer; + while true do + local frame, len = frames.parse(s.readbuffer); + if frame == nil then break end + s.readbuffer = s.readbuffer:sub(len+1); + + log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + + -- Error cases + if frame.RSV1 or frame.RSV2 or frame.RSV3 then -- Reserved bits non zero + return fail(s, 1002, "Reserved bits not zero"); + end + + if frame.opcode < 0x8 then + local databuffer = s.databuffer; + if frame.opcode == 0x0 then -- Continuation frames + if not databuffer then + return fail(s, 1002, "Unexpected continuation frame"); + end + databuffer[#databuffer+1] = frame.data; + elseif frame.opcode == 0x1 or frame.opcode == 0x2 then -- Text or Binary frame + if databuffer then + return fail(s, 1002, "Continuation frame expected"); + end + databuffer = {type=frame.opcode, frame.data}; + s.databuffer = databuffer; + else + return fail(s, 1002, "Reserved opcode"); + end + if frame.FIN then + s.databuffer = nil; + if s.onmessage then + s:onmessage(t_concat(databuffer), databuffer.type); + end + end + else -- Control frame + if frame.length > 125 then -- Control frame with too much payload + return fail(s, 1002, "Payload too large"); + elseif not frame.FIN then -- Fragmented control frame + return fail(s, 1002, "Fragmented control frame"); + end + if frame.opcode == 0x8 then -- Close request + if frame.length == 1 then + return fail(s, 1002, "Close frame with payload, but too short for status code"); + end + local status_code, message = frames.parse_close(frame.data); + if status_code == nil then + --[[ RFC 6455 7.4.1 + 1005 is a reserved value and MUST NOT be set as a status code in a + Close control frame by an endpoint. It is designated for use in + applications expecting a status code to indicate that no status + code was actually present. + ]] + status_code = 1005 + elseif status_code < 1000 then + return fail(s, 1002, "Closed with invalid status code"); + elseif ((status_code > 1003 and status_code < 1007) or status_code > 1011) and status_code < 3000 then + return fail(s, 1002, "Closed with reserved status code"); + end + s.close_code, s.close_message = status_code, message; + s:close(1000); + return true; + elseif frame.opcode == 0x9 then -- Ping frame + frame.opcode = 0xA; + frame.MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + handler:write(frames.build(frame)); + elseif frame.opcode == 0xA then -- Pong frame + log("debug", "Received unexpected pong frame: " .. tostring(frame.data)); + else + return fail(s, 1002, "Reserved opcode"); + end + end + end + return true; +end + +local websocket_methods = {}; +local function close_timeout_cb(now, timerid, s) + s.close_timer = nil; + log("warn", "Close timeout waiting for server to close, closing manually."); + s.handler:close(); +end +function websocket_methods:close(code, reason) + if self.readyState < 2 then + code = code or 1000; + log("debug", "closing WebSocket with code %i: %s" , code , tostring(reason)); + self.readyState = 2; + local handler = self.handler; + handler:write(frames.build_close(code, reason, true)); + -- Do not close socket straight away, wait for acknowledgement from server. + self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self); + elseif self.readyState == 2 then + log("debug", "tried to close a closing WebSocket, closing the raw socket."); + -- Stop timer + if self.close_timer then + timer.stop(self.close_timer); + self.close_timer = nil; + end + local handler = self.handler; + handler:close(); + else + log("debug", "tried to close a closed WebSocket, ignoring."); + end +end +function websocket_methods:send(data, opcode) + if self.readyState < 1 then + return nil, "WebSocket not open yet, unable to send data."; + elseif self.readyState >= 2 then + return nil, "WebSocket closed, unable to send data."; + end + if opcode == "text" or opcode == nil then + opcode = 0x1; + elseif opcode == "binary" then + opcode = 0x2; + end + local frame = { + FIN = true; + MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + opcode = opcode; + data = tostring(data); + }; + log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + return self.handler:write(frames.build(frame)); +end + +local websocket_metatable = { + __index = websocket_methods; +}; + +local function connect(url, ex, listeners) + ex = ex or {}; + + --[[RFC 6455 4.1.7: + The request MUST include a header field with the name + |Sec-WebSocket-Key|. The value of this header field MUST be a + nonce consisting of a randomly selected 16-byte value that has + been base64-encoded (see Section 4 of [RFC4648]). The nonce + MUST be selected randomly for each connection. + ]] + local key = base64.encode(random_bytes(16)); + + -- Either a single protocol string or an array of protocol strings. + local protocol = ex.protocol; + if type(protocol) == "string" then + protocol = { protocol, [protocol] = true }; + elseif type(protocol) == "table" and protocol[1] then + for _, v in ipairs(protocol) do + protocol[v] = true; + end + else + protocol = nil; + end + + local headers = { + ["Upgrade"] = "websocket"; + ["Connection"] = "Upgrade"; + ["Sec-WebSocket-Key"] = key; + ["Sec-WebSocket-Protocol"] = protocol and t_concat(protocol, ", "); + ["Sec-WebSocket-Version"] = "13"; + ["Sec-WebSocket-Extensions"] = ex.extensions; + } + if ex.headers then + for k,v in pairs(ex.headers) do + headers[k] = v; + end + end + + local s = setmetatable({ + readbuffer = ""; + databuffer = nil; + handler = nil; + close_code = nil; + close_message = nil; + close_timer = nil; + readyState = 0; + protocol = nil; + + url = url; + + onopen = listeners.onopen; + onclose = listeners.onclose; + onmessage = listeners.onmessage; + onerror = listeners.onerror; + }, websocket_metatable); + + local http_url = url:gsub("^(ws)", "http"); + local http_req = http.request(http_url, { + method = "GET"; + headers = headers; + sslctx = ex.sslctx; + }, function(b, c, r, http_req) + if c ~= 101 + or r.headers["connection"]:lower() ~= "upgrade" + or r.headers["upgrade"] ~= "websocket" + or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) + or (protocol and not protocol[r.headers["sec-websocket-protocol"]]) + then + s.readyState = 3; + log("warn", "WebSocket connection to %s failed: %s", url, tostring(b)); + if s.onerror then s:onerror("connecting-failed"); end + return; + end + + s.protocol = r.headers["sec-websocket-protocol"]; + + -- Take possession of socket from http + http_req.conn = nil; + local handler = http_req.handler; + s.handler = handler; + websockets[handler] = s; + handler:setlistener(websocket_listeners); + + log("debug", "WebSocket connected successfully to %s", url); + s.readyState = 1; + if s.onopen then s:onopen(); end + websocket_listeners.onincoming(handler, b); + end); + + return s; +end + +return { + connect = connect; +}; diff --git a/net/websocket/frames.lua b/net/websocket/frames.lua new file mode 100644 index 00000000..5fe96d45 --- /dev/null +++ b/net/websocket/frames.lua @@ -0,0 +1,219 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local softreq = require "util.dependencies".softreq; +local random_bytes = require "util.random".bytes; + +local bit = assert(softreq"bit" or softreq"bit32", + "No bit module found. See https://prosody.im/doc/depends#bitop"); +local band = bit.band; +local bor = bit.bor; +local bxor = bit.bxor; +local lshift = bit.lshift; +local rshift = bit.rshift; + +local t_concat = table.concat; +local s_byte = string.byte; +local s_char= string.char; +local s_sub = string.sub; +local s_pack = string.pack; +local s_unpack = string.unpack; + +if not s_pack and softreq"struct" then + s_pack = softreq"struct".pack; + s_unpack = softreq"struct".unpack; +end + +local function read_uint16be(str, pos) + local l1, l2 = s_byte(str, pos, pos+1); + return l1*256 + l2; +end +-- FIXME: this may lose precision +local function read_uint64be(str, pos) + local l1, l2, l3, l4, l5, l6, l7, l8 = s_byte(str, pos, pos+7); + local h = lshift(l1, 24) + lshift(l2, 16) + lshift(l3, 8) + l4; + local l = lshift(l5, 24) + lshift(l6, 16) + lshift(l7, 8) + l8; + return h * 2^32 + l; +end +local function pack_uint16be(x) + return s_char(rshift(x, 8), band(x, 0xFF)); +end +local function get_byte(x, n) + return band(rshift(x, n), 0xFF); +end +local function pack_uint64be(x) + local h = band(x / 2^32, 2^32-1); + return s_char(get_byte(h, 24), get_byte(h, 16), get_byte(h, 8), band(h, 0xFF), + get_byte(x, 24), get_byte(x, 16), get_byte(x, 8), band(x, 0xFF)); +end + +if s_pack then + function pack_uint16be(x) + return s_pack(">I2", x); + end + function pack_uint64be(x) + return s_pack(">I8", x); + end +end + +if s_unpack then + function read_uint16be(str, pos) + return s_unpack(">I2", str, pos); + end + function read_uint64be(str, pos) + return s_unpack(">I8", str, pos); + end +end + +local function parse_frame_header(frame) + if #frame < 2 then return; end + + local byte1, byte2 = s_byte(frame, 1, 2); + local result = { + FIN = band(byte1, 0x80) > 0; + RSV1 = band(byte1, 0x40) > 0; + RSV2 = band(byte1, 0x20) > 0; + RSV3 = band(byte1, 0x10) > 0; + opcode = band(byte1, 0x0F); + + MASK = band(byte2, 0x80) > 0; + length = band(byte2, 0x7F); + }; + + local length_bytes = 0; + if result.length == 126 then + length_bytes = 2; + elseif result.length == 127 then + length_bytes = 8; + end + + local header_length = 2 + length_bytes + (result.MASK and 4 or 0); + if #frame < header_length then return; end + + if length_bytes == 2 then + result.length = read_uint16be(frame, 3); + elseif length_bytes == 8 then + result.length = read_uint64be(frame, 3); + end + + if result.MASK then + result.key = { s_byte(frame, length_bytes+3, length_bytes+6) }; + end + + return result, header_length; +end + +-- XORs the string `str` with the array of bytes `key` +-- TODO: optimize +local function apply_mask(str, key, from, to) + from = from or 1 + if from < 0 then from = #str + from + 1 end -- negative indicies + to = to or #str + if to < 0 then to = #str + to + 1 end -- negative indicies + local key_len = #key + local counter = 0; + local data = {}; + for i = from, to do + local key_index = counter%key_len + 1; + counter = counter + 1; + data[counter] = s_char(bxor(key[key_index], s_byte(str, i))); + end + return t_concat(data); +end + +local function parse_frame_body(frame, header, pos) + if header.MASK then + return apply_mask(frame, header.key, pos, pos + header.length - 1); + else + return frame:sub(pos, pos + header.length - 1); + end +end + +local function parse_frame(frame) + local result, pos = parse_frame_header(frame); + if result == nil or #frame < (pos + result.length) then return; end + result.data = parse_frame_body(frame, result, pos+1); + return result, pos + result.length; +end + +local function build_frame(desc) + local data = desc.data or ""; + + assert(desc.opcode and desc.opcode >= 0 and desc.opcode <= 0xF, "Invalid WebSocket opcode"); + if desc.opcode >= 0x8 then + -- RFC 6455 5.5 + assert(#data <= 125, "WebSocket control frames MUST have a payload length of 125 bytes or less."); + end + + local b1 = bor(desc.opcode, + desc.FIN and 0x80 or 0, + desc.RSV1 and 0x40 or 0, + desc.RSV2 and 0x20 or 0, + desc.RSV3 and 0x10 or 0); + + local b2 = #data; + local length_extra; + if b2 <= 125 then -- 7-bit length + length_extra = ""; + elseif b2 <= 0xFFFF then -- 2-byte length + b2 = 126; + length_extra = pack_uint16be(#data); + else -- 8-byte length + b2 = 127; + length_extra = pack_uint64be(#data); + end + + local key = "" + if desc.MASK then + local key_a = desc.key + if key_a then + key = s_char(unpack(key_a, 1, 4)); + else + key = random_bytes(4); + key_a = {key:byte(1,4)}; + end + b2 = bor(b2, 0x80); + data = apply_mask(data, key_a); + end + + return s_char(b1, b2) .. length_extra .. key .. data +end + +local function parse_close(data) + local code, message + if #data >= 2 then + code = read_uint16be(data, 1); + if #data > 2 then + message = s_sub(data, 3); + end + end + return code, message +end + +local function build_close(code, message, mask) + local data = pack_uint16be(code); + if message then + assert(#message<=123, "Close reason must be <=123 bytes"); + data = data .. message; + end + return build_frame({ + opcode = 0x8; + FIN = true; + MASK = mask; + data = data; + }); +end + +return { + parse_header = parse_frame_header; + parse_body = parse_frame_body; + parse = parse_frame; + build = build_frame; + parse_close = parse_close; + build_close = build_close; +}; |