diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/adns.lua | 6 | ||||
-rw-r--r-- | net/dns.lua | 36 | ||||
-rw-r--r-- | net/http.lua | 30 | ||||
-rw-r--r-- | net/http/codes.lua | 12 | ||||
-rw-r--r-- | net/http/server.lua | 27 | ||||
-rw-r--r-- | net/server.lua | 2 | ||||
-rw-r--r-- | net/server_event.lua | 160 | ||||
-rw-r--r-- | net/server_select.lua | 33 | ||||
-rw-r--r-- | net/websocket.lua | 272 | ||||
-rw-r--r-- | net/websocket/frames.lua | 195 |
10 files changed, 630 insertions, 143 deletions
diff --git a/net/adns.lua b/net/adns.lua index 3fc958f4..87f2876b 100644 --- a/net/adns.lua +++ b/net/adns.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -65,7 +65,7 @@ function new_async_socket(sock, resolver) if resolver.socketset[conn] == resolver.best_server and resolver.best_server == #servers then log("error", "Exhausted all %d configured DNS servers, next lookup will try %s again", #servers, servers[1]); end - + resolver:servfail(conn); -- Let the magic commence end end @@ -73,7 +73,7 @@ function new_async_socket(sock, resolver) if not handler then return nil, err; end - + handler.settimeout = function () end handler.setsockname = function (_, ...) return sock:setsockname(...); end handler.setpeername = function (_, ...) peername = (...); local ret, err = sock:setpeername(...); _:set_send(dummy_send); return ret, err; end diff --git a/net/dns.lua b/net/dns.lua index 763ee9ec..992f3986 100644 --- a/net/dns.lua +++ b/net/dns.lua @@ -14,6 +14,7 @@ local socket = require "socket"; local timer = require "util.timer"; +local new_ip = require "util.ip".new_ip; local _, windows = pcall(require, "util.windows"); local is_windows = (_ and windows) or os.getenv("WINDIR"); @@ -212,15 +213,6 @@ function cache_metatable.__tostring(cache) end -function resolver:new() -- - - - - - - - - - - - - - - - - - - - - resolver - local r = { active = {}, cache = {}, unsorted = {} }; - setmetatable(r, resolver); - setmetatable(r.cache, cache_metatable); - setmetatable(r.unsorted, { __mode = 'kv' }); - return r; -end - - -- packet layer -------------------------------------------------- packet layer @@ -599,11 +591,12 @@ function resolver:adddefaultnameservers() -- - - - - adddefaultnameservers if resolv_conf then for line in resolv_conf:lines() do line = line:gsub("#.*$", "") - :match('^%s*nameserver%s+(.*)%s*$'); + :match('^%s*nameserver%s+([%x:%.]*)%s*$'); if line then - line:gsub("%f[%d.](%d+%.%d+%.%d+%.%d+)%f[^%d.]", function (address) - self:addnameserver(address) - end); + local ip = new_ip(line); + if ip then + self:addnameserver(ip.addr); + end end end end @@ -623,7 +616,12 @@ function resolver:getsocket(servernum) -- - - - - - - - - - - - - getsocket if sock then return sock; end local ok, err; - sock, err = socket.udp(); + local peer = self.server[servernum]; + if peer:find(":") then + sock, err = socket.udp6(); + else + sock, err = socket.udp(); + end if sock and self.socket_wrapper then sock, err = self.socket_wrapper(sock, self); end if not sock then return nil, err; @@ -636,7 +634,7 @@ function resolver:getsocket(servernum) -- - - - - - - - - - - - - getsocket -- if so, try the next server ok, err = sock:setsockname('*', 0); if not ok then return self:servfail(sock, err); end - ok, err = sock:setpeername(self.server[servernum], 53); + ok, err = sock:setpeername(peer, 53); if not ok then return self:servfail(sock, err); end return sock; end @@ -766,7 +764,7 @@ function resolver:query(qname, qtype, qclass) -- - - - - - - - - - -- query return nil, err; end conn:send (o.packet) - + if timer and self.timeout then local num_servers = #self.server; local i = 1; @@ -863,7 +861,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive -- retire the query local queries = self.active[response.header.id]; queries[response.question.raw] = nil; - + if not next(queries) then self.active[response.header.id] = nil; end if not next(self.active) then self:closeall(); end @@ -877,7 +875,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive set(self.wanted, q.class, q.type, q.name, nil); end end - + end end end @@ -1047,8 +1045,6 @@ end function dns.resolver () -- - - - - - - - - - - - - - - - - - - - - resolver - -- this function seems to be redundant with resolver.new () - local r = { active = {}, cache = {}, unsorted = {}, wanted = {}, best_server = 1 }; setmetatable (r, resolver); setmetatable (r.cache, cache_metatable); diff --git a/net/http.lua b/net/http.lua index 8ce47494..e6bf0018 100644 --- a/net/http.lua +++ b/net/http.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -37,7 +37,7 @@ function listener.onconnect(conn) if req.query then t_insert(request_line, 4, "?"..req.query); end - + conn:write(t_concat(request_line)); local t = { [2] = ": ", [4] = "\r\n" }; for k, v in pairs(req.headers) do @@ -45,7 +45,7 @@ function listener.onconnect(conn) conn:write(t_concat(t)); end conn:write("\r\n"); - + if req.body then conn:write(req.body); end @@ -85,12 +85,12 @@ local function request_reader(request, data, err) end destroy_request(request); end - + if not data then error_cb(err); return; end - + local function success_cb(r) if request.callback then request.callback(r.body, r.code, r, request); @@ -109,18 +109,18 @@ end local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end function request(u, ex, callback) local req = url.parse(u); - + if not (req and req.host) then callback(nil, 0, req); return nil, "invalid-url"; end - + if not req.path then req.path = "/"; end - + local method, headers, body; - + local host, port = req.host, req.port; local host_header = host; if (port == "80" and req.scheme == "http") @@ -134,7 +134,7 @@ function request(u, ex, callback) ["Host"] = host_header; ["User-Agent"] = "Prosody XMPP Server"; }; - + if req.userinfo then headers["Authorization"] = "Basic "..b64(req.userinfo); end @@ -154,16 +154,16 @@ function request(u, ex, callback) end end end - + -- Attach to request object req.method, req.headers, req.body = method, headers, body; - + local using_https = req.scheme == "https"; if using_https and not ssl_available then error("SSL not available, unable to contact https URL"); end local port_number = port and tonumber(port) or (using_https and 443 or 80); - + -- Connect the socket, and wrap it with net.server local conn = socket.tcp(); conn:settimeout(10); @@ -172,7 +172,7 @@ function request(u, ex, callback) callback(nil, 0, req); return nil, err; end - + local sslctx = false; if using_https then sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2", "no_sslv3" } }; @@ -180,7 +180,7 @@ function request(u, ex, callback) req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx)); req.write = function (...) return req.handler:write(...); end - + req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end req.reader = request_reader; req.state = "status"; diff --git a/net/http/codes.lua b/net/http/codes.lua index 0cadd079..bc31c7dd 100644 --- a/net/http/codes.lua +++ b/net/http/codes.lua @@ -25,6 +25,7 @@ local response_codes = { [305] = "Use Proxy"; -- The 306 status code was used in a previous version of [RFC2616], is no longer used, and the code is reserved. [307] = "Temporary Redirect"; + [308] = "Permanent Redirect"; [400] = "Bad Request"; [401] = "Unauthorized"; @@ -39,17 +40,21 @@ local response_codes = { [410] = "Gone"; [411] = "Length Required"; [412] = "Precondition Failed"; - [413] = "Request Entity Too Large"; - [414] = "Request-URI Too Long"; + [413] = "Payload Too Large"; + [414] = "URI Too Long"; [415] = "Unsupported Media Type"; - [416] = "Requested Range Not Satisfiable"; + [416] = "Range Not Satisfiable"; [417] = "Expectation Failed"; [418] = "I'm a teapot"; + [421] = "Misdirected Request"; [422] = "Unprocessable Entity"; [423] = "Locked"; [424] = "Failed Dependency"; -- The 425 status code is reserved for the WebDAV advanced collections expired proposal [RFC2817] [426] = "Upgrade Required"; + [428] = "Precondition Required"; + [429] = "Too Many Requests"; + [431] = "Request Header Fields Too Large"; [500] = "Internal Server Error"; [501] = "Not Implemented"; @@ -61,6 +66,7 @@ local response_codes = { [507] = "Insufficient Storage"; [508] = "Loop Detected"; [510] = "Not Extended"; + [511] = "Network Authentication Required"; }; for k,v in pairs(response_codes) do response_codes[k] = k.." "..v; end diff --git a/net/http/server.lua b/net/http/server.lua index f091595c..ffcceabe 100644 --- a/net/http/server.lua +++ b/net/http/server.lua @@ -189,6 +189,7 @@ function handle_request(conn, request, finish_cb) persistent = persistent; conn = conn; send = _M.send_response; + done = _M.finish_response; finish_cb = finish_cb; }; conn._http_open_response = response; @@ -208,7 +209,7 @@ function handle_request(conn, request, finish_cb) err_code, err = 400, "Missing or invalid 'Host' header"; end end - + if err then response.status_code = err_code; response:send(events.fire_event("http-error", { code = err_code, message = err })); @@ -250,24 +251,30 @@ function handle_request(conn, request, finish_cb) response.status_code = 404; response:send(events.fire_event("http-error", { code = 404 })); end -function _M.send_response(response, body) - if response.finished then return; end - response.finished = true; - response.conn._http_open_response = nil; - +local function prepare_header(response) local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]); local headers = response.headers; - body = body or response.body or ""; - headers.content_length = #body; - local output = { status_line }; for k,v in pairs(headers) do t_insert(output, headerfix[k]..v); end t_insert(output, "\r\n\r\n"); + return output; +end +_M.prepare_header = prepare_header; +function _M.send_response(response, body) + if response.finished then return; end + body = body or response.body or ""; + response.headers.content_length = #body; + local output = prepare_header(response); t_insert(output, body); - response.conn:write(t_concat(output)); + response:done(); +end +function _M.finish_response(response) + if response.finished then return; end + response.finished = true; + response.conn._http_open_response = nil; if response.on_destroy then response:on_destroy(); response.on_destroy = nil; diff --git a/net/server.lua b/net/server.lua index 375e7081..2a0b89ae 100644 --- a/net/server.lua +++ b/net/server.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- diff --git a/net/server_event.lua b/net/server_event.lua index 45938a13..756e9837 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -115,10 +115,10 @@ end )( ) local interface_mt do interface_mt = {}; interface_mt.__index = interface_mt; - + local addevent = base.addevent local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - + -- Private methods function interface_mt:_position(new_position) self.position = new_position or self.position @@ -127,7 +127,7 @@ do function interface_mt:_close() return self:_destroy(); end - + function interface_mt:_start_connection(plainssl) -- should be called from addclient local callback = function( event ) if EV_TIMEOUT == event then -- timeout during connection @@ -268,12 +268,12 @@ do interfacelist( "delete", self ) return true end - + function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting return nointerface, noreading, nowriting end - + --TODO: Deprecate function interface_mt:lock_read(switch) if switch then @@ -300,7 +300,7 @@ do end return self._connections end - + -- Public methods function interface_mt:write(data) if self.nowriting then return nil, "locked" end @@ -343,27 +343,27 @@ do return true end end - + function interface_mt:socket() return self.conn end - + function interface_mt:server() return self._server or self; end - + function interface_mt:port() return self._port end - + function interface_mt:serverport() return self._serverport end - + function interface_mt:ip() return self._ip end - + function interface_mt:ssl() return self._usingssl end @@ -372,15 +372,15 @@ do function interface_mt:type() return self._type or "client" end - + function interface_mt:connections() return self._connections end - + function interface_mt:address() return self.addr end - + function interface_mt:set_sslctx(sslctx) self._sslctx = sslctx; if sslctx then @@ -396,11 +396,11 @@ do end return self._pattern; end - + function interface_mt:set_send(new_send) -- No-op, we always use the underlying connection's send end - + function interface_mt:starttls(sslctx, call_onconnect) debug( "try to start ssl at client id:", self.id ) local err @@ -429,22 +429,22 @@ do self.starttls = false; return true end - + function interface_mt:setoption(option, value) if self.conn.setoption then return self.conn:setoption(option, value); end return false, "setoption not implemented"; end - + function interface_mt:setlistener(listener) self:ondetach(); -- Notify listener that it is no longer responsible for this connection - self.onconnect, self.ondisconnect, self.onincoming, - self.ontimeout, self.onstatus, self.ondetach - = listener.onconnect, listener.ondisconnect, listener.onincoming, - listener.ontimeout, listener.onstatus, listener.ondetach; + self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, + self.onreadtimeout, self.onstatus, self.ondetach + = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, + listener.onreadtimeout, listener.onstatus, listener.ondetach; end - + -- Stub handlers function interface_mt:onconnect() end @@ -454,6 +454,12 @@ do end function interface_mt:ontimeout() end + function interface_mt:onreadtimeout() + self.fatalerror = "timeout during receiving" + debug( "connection failed:", self.fatalerror ) + self:_close() + self.eventread = nil + end function interface_mt:ondrain() end function interface_mt:ondetach() @@ -483,6 +489,7 @@ do ondisconnect = listener.ondisconnect; -- will be called when client disconnects onincoming = listener.onincoming; -- will be called when client sends data ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs ondrain = listener.ondrain; -- called when writebuffer is empty ondetach = listener.ondetach; -- called when disassociating this listener from this connection onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) @@ -498,7 +505,7 @@ do noreading = false, nowriting = false; -- locks of the read/writecallback startsslcallback = false; -- starting handshake callback position = false; -- position of client in interfacelist - + -- Properties _ip = ip, _port = port, _server = server, _pattern = pattern, _serverport = (server and server:port() or nil), @@ -574,7 +581,7 @@ do end end end - + interface.readcallback = function( event ) -- called on read events --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) if interface.noreading or interface.fatalerror then -- leave this event @@ -582,61 +589,56 @@ do interface.eventread = nil return -1 end - if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect - interface.fatalerror = "timeout during receiving" - debug( "connection failed:", interface.fatalerror ) + if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then + return -1 -- took too long to get some data from client -> disconnect + end + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", tostring(ret) ) + end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil + end + end + local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 - else -- can read - if interface._usingssl then -- handle luasec - if interface.eventwritetimeout then -- ok, in the past writecallback was regged - local ret = interface.writecallback( ) -- call it - --vdebug( "tried to write in readcallback, result:", tostring(ret) ) - end - if interface.eventreadtimeout then - interface.eventreadtimeout:close( ) - interface.eventreadtimeout = nil + end + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) end - end - local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" - --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part - if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length - interface.fatalerror = "receive buffer exceeded" - debug( "fatal error:", interface.fatalerror ) + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) + interface:_close() + end, cfg.READ_TIMEOUT + ) + debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 end - if err and ( err ~= "timeout" and err ~= "wantread" ) then - if "wantwrite" == err then -- need to read on write event - if not interface.eventwrite then -- register new write event if needed - interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) - end - interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, - function( ) - interface:_close() - end, cfg.READ_TIMEOUT - ) - debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) - -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... - else -- connection was closed or fatal error - interface.fatalerror = err - debug( "connection failed in read event:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - end - else - interface.onincoming( interface, buffer, err ) -- send new data to listener - end - if interface.noreading then - interface.eventread = nil; - return -1; - end - return EV_READ, cfg.READ_TIMEOUT + else + interface.onincoming( interface, buffer, err ) -- send new data to listener end + if interface.noreading then + interface.eventread = nil; + return -1; + end + return EV_READ, cfg.READ_TIMEOUT end client:settimeout( 0 ) -- set non blocking @@ -652,7 +654,7 @@ do debug "creating server interface..." local interface = { _connections = 0; - + conn = server; onconnect = listener.onconnect; -- will be called when new client connected eventread = false; -- read event handler @@ -660,7 +662,7 @@ do readcallback = false; -- read event callback fatalerror = false; -- error message nointerface = true; -- lock/unlock parameter - + _ip = addr, _port = port, _pattern = pattern, _sslctx = sslctx; } @@ -699,12 +701,12 @@ do clientinterface:_start_session( true ) end debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); - + client, err = server:accept() -- try to accept again end return EV_READ end - + server:settimeout( 0 ) setmetatable(interface, interface_mt) interfacelist( "add", interface ) @@ -747,7 +749,7 @@ do return interface, client --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface end - + function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) local client, err = socket.tcp() -- creating new socket if not client then @@ -838,14 +840,14 @@ end local function link(sender, receiver, buffersize) local sender_locked; - + function receiver:ondrain() if sender_locked then sender:resume(); sender_locked = nil; end end - + function sender:onincoming(data) receiver:write(data); if receiver.writebufferlen >= buffersize then diff --git a/net/server_select.lua b/net/server_select.lua index 7ac41523..486e953b 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -1,7 +1,7 @@ --- +-- -- server.lua by blastbeat of the luadch project -- Re-used here under the MIT/X Consortium License --- +-- -- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain -- @@ -145,7 +145,7 @@ _tcpbacklog = 128 -- some kind of hint to the OS _maxsendlen = 51000 * 1024 -- max len of send buffer _maxreadlen = 25000 * 1024 -- max len of read buffer -_checkinterval = 1200000 -- interval in secs to check idle clients +_checkinterval = 30 -- interval in secs to check idle clients _sendtimeout = 60000 -- allowed send idle time in secs _readtimeout = 6 * 60 * 60 -- allowed read idle time in secs @@ -284,6 +284,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local status = listeners.onstatus local disconnect = listeners.ondisconnect local drain = listeners.ondrain + local onreadtimeout = listeners.onreadtimeout; local detach = listeners.ondetach local bufferqueue = { } -- buffer array @@ -313,6 +314,8 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.disconnect = function( ) return disconnect end + handler.onreadtimeout = onreadtimeout; + handler.setlistener = function( self, listeners ) if detach then detach(self) -- Notify listener that it is no longer responsible for this connection @@ -321,6 +324,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport disconnect = listeners.ondisconnect status = listeners.onstatus drain = listeners.ondrain + handler.onreadtimeout = listeners.onreadtimeout detach = listeners.ondetach end handler.getstats = function( ) @@ -564,6 +568,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _ = status and status( handler, "ssl-handshake-complete" ) if self.autostart_ssl and listeners.onconnect then listeners.onconnect(self); + if bufferqueuelen ~= 0 then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + end end _readlistlen = addsocket(_readlist, client, _readlistlen) return true @@ -613,7 +620,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport shutdown = id _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) - + -- remove traces of the old socket _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) @@ -701,7 +708,7 @@ local function link(sender, receiver, buffersize) sender_locked = nil; end end - + local _readbuffer = sender.readbuffer; function sender.readbuffer() _readbuffer(); @@ -870,16 +877,18 @@ loop = function(once) -- this is the main loop of the program _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do if os_difftime( _currenttime - timestamp ) > _sendtimeout then - --_writetimes[ handler ] = nil handler.disconnect( )( handler, "send timeout" ) handler:force_close() -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do if os_difftime( _currenttime - timestamp ) > _readtimeout then - --_readtimes[ handler ] = nil - handler.disconnect( )( handler, "read timeout" ) - handler:close( ) -- forced disconnect? + if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then + handler.disconnect( )( handler, "read timeout" ) + handler:close( ) -- forced disconnect? + else + _readtimes[ handler ] = _currenttime -- reset timer + end end end end @@ -940,9 +949,9 @@ local addclient = function( address, port, listeners, pattern, sslctx ) client:settimeout( 0 ) _, err = client:connect( address, port ) if err then -- try again - local handler = wrapclient( client, address, port, listeners ) + return wrapclient( client, address, port, listeners, pattern, sslctx ) else - wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx ) + return wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx ) end end @@ -972,7 +981,7 @@ return { addclient = addclient, wrapclient = wrapclient, - + loop = loop, link = link, step = step, diff --git a/net/websocket.lua b/net/websocket.lua new file mode 100644 index 00000000..a4274eec --- /dev/null +++ b/net/websocket.lua @@ -0,0 +1,272 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local t_concat = table.concat; + +local http = require "net.http"; +local frames = require "net.websocket.frames"; +local base64 = require "util.encodings".base64; +local sha1 = require "util.hashes".sha1; +local random_bytes = require "util.random".bytes; +local timer = require "util.timer"; +local log = require "util.logger".init "websocket"; + +local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection. + +local websockets = {}; + +local websocket_listeners = {}; +function websocket_listeners.ondisconnect(handler, err) + local s = websockets[handler]; + websockets[handler] = nil; + if s.close_timer then + timer.stop(s.close_timer); + s.close_timer = nil; + end + s.readyState = 3; + if s.close_code == nil and s.onerror then s:onerror(err); end + if s.onclose then s:onclose(s.close_code, s.close_message or err); end +end + +function websocket_listeners.ondetach(handler) + websockets[handler] = nil; +end + +local function fail(s, code, reason) + module:log("warn", "WebSocket connection failed, closing. %d %s", code, reason); + s:close(code, reason); + s.handler:close(); + return false +end + +function websocket_listeners.onincoming(handler, buffer, err) + local s = websockets[handler]; + s.readbuffer = s.readbuffer..buffer; + while true do + local frame, len = frames.parse(s.readbuffer); + if frame == nil then break end + s.readbuffer = s.readbuffer:sub(len+1); + + log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + + -- Error cases + if frame.RSV1 or frame.RSV2 or frame.RSV3 then -- Reserved bits non zero + return fail(s, 1002, "Reserved bits not zero"); + end + + if frame.opcode < 0x8 then + local databuffer = s.databuffer; + if frame.opcode == 0x0 then -- Continuation frames + if not databuffer then + return fail(s, 1002, "Unexpected continuation frame"); + end + databuffer[#databuffer+1] = frame.data; + elseif frame.opcode == 0x1 or frame.opcode == 0x2 then -- Text or Binary frame + if databuffer then + return fail(s, 1002, "Continuation frame expected"); + end + databuffer = {type=frame.opcode, frame.data}; + s.databuffer = databuffer; + else + return fail(s, 1002, "Reserved opcode"); + end + if frame.FIN then + s.databuffer = nil; + if s.onmessage then + s:onmessage(t_concat(databuffer), databuffer.type); + end + end + else -- Control frame + if frame.length > 125 then -- Control frame with too much payload + return fail(s, 1002, "Payload too large"); + elseif not frame.FIN then -- Fragmented control frame + return fail(s, 1002, "Fragmented control frame"); + end + if frame.opcode == 0x8 then -- Close request + if frame.length == 1 then + return fail(s, 1002, "Close frame with payload, but too short for status code"); + end + local status_code, message = frames.parse_close(frame.data); + if status_code == nil then + --[[ RFC 6455 7.4.1 + 1005 is a reserved value and MUST NOT be set as a status code in a + Close control frame by an endpoint. It is designated for use in + applications expecting a status code to indicate that no status + code was actually present. + ]] + status_code = 1005 + elseif status_code < 1000 then + return fail(s, 1002, "Closed with invalid status code"); + elseif ((status_code > 1003 and status_code < 1007) or status_code > 1011) and status_code < 3000 then + return fail(s, 1002, "Closed with reserved status code"); + end + s.close_code, s.close_message = status_code, message; + s:close(1000); + return true; + elseif frame.opcode == 0x9 then -- Ping frame + frame.opcode = 0xA; + frame.MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + handler:write(frames.build(frame)); + elseif frame.opcode == 0xA then -- Pong frame + log("debug", "Received unexpected pong frame: " .. tostring(frame.data)); + else + return fail(s, 1002, "Reserved opcode"); + end + end + end + return true; +end + +local websocket_methods = {}; +local function close_timeout_cb(now, timerid, s) + s.close_timer = nil; + log("warn", "Close timeout waiting for server to close, closing manually."); + s.handler:close(); +end +function websocket_methods:close(code, reason) + if self.readyState < 2 then + code = code or 1000; + log("debug", "closing WebSocket with code %i: %s" , code , tostring(reason)); + self.readyState = 2; + local handler = self.handler; + handler:write(frames.build_close(code, reason, true)); + -- Do not close socket straight away, wait for acknowledgement from server. + self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self); + elseif self.readyState == 2 then + log("debug", "tried to close a closing WebSocket, closing the raw socket."); + -- Stop timer + if self.close_timer then + timer.stop(self.close_timer); + self.close_timer = nil; + end + local handler = self.handler; + handler:close(); + else + log("debug", "tried to close a closed WebSocket, ignoring."); + end +end +function websocket_methods:send(data, opcode) + if self.readyState < 1 then + return nil, "WebSocket not open yet, unable to send data."; + elseif self.readyState >= 2 then + return nil, "WebSocket closed, unable to send data."; + end + if opcode == "text" or opcode == nil then + opcode = 0x1; + elseif opcode == "binary" then + opcode = 0x2; + end + local frame = { + FIN = true; + MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked + opcode = opcode; + data = tostring(data); + }; + log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + return self.handler:write(frames.build(frame)); +end + +local websocket_metatable = { + __index = websocket_methods; +}; + +local function connect(url, ex, listeners) + ex = ex or {}; + + --[[RFC 6455 4.1.7: + The request MUST include a header field with the name + |Sec-WebSocket-Key|. The value of this header field MUST be a + nonce consisting of a randomly selected 16-byte value that has + been base64-encoded (see Section 4 of [RFC4648]). The nonce + MUST be selected randomly for each connection. + ]] + local key = base64.encode(random_bytes(16)); + + -- Either a single protocol string or an array of protocol strings. + local protocol = ex.protocol; + if type(protocol) == "string" then + protocol = { protocol, [protocol] = true }; + elseif type(protocol) == "table" and protocol[1] then + for _, v in ipairs(protocol) do + protocol[v] = true; + end + else + protocol = nil; + end + + local headers = { + ["Upgrade"] = "websocket"; + ["Connection"] = "Upgrade"; + ["Sec-WebSocket-Key"] = key; + ["Sec-WebSocket-Protocol"] = protocol and t_concat(protocol, ", "); + ["Sec-WebSocket-Version"] = "13"; + ["Sec-WebSocket-Extensions"] = ex.extensions; + } + if ex.headers then + for k,v in pairs(ex.headers) do + headers[k] = v; + end + end + + local s = setmetatable({ + readbuffer = ""; + databuffer = nil; + handler = nil; + close_code = nil; + close_message = nil; + close_timer = nil; + readyState = 0; + protocol = nil; + + url = url; + + onopen = listeners.onopen; + onclose = listeners.onclose; + onmessage = listeners.onmessage; + onerror = listeners.onerror; + }, websocket_metatable); + + local http_url = url:gsub("^(ws)", "http"); + local http_req = http.request(http_url, { + method = "GET"; + headers = headers; + sslctx = ex.sslctx; + }, function(b, c, r, http_req) + if c ~= 101 + or r.headers["connection"]:lower() ~= "upgrade" + or r.headers["upgrade"] ~= "websocket" + or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) + or (protocol and not protocol[r.headers["sec-websocket-protocol"]]) + then + s.readyState = 3; + log("warn", "WebSocket connection to %s failed: %s", url, tostring(b)); + if s.onerror then s:onerror("connecting-failed"); end + return; + end + + s.protocol = r.headers["sec-websocket-protocol"]; + + -- Take possession of socket from http + http_req.conn = nil; + local handler = http_req.handler; + s.handler = handler; + websockets[handler] = s; + handler:setlistener(websocket_listeners); + + log("debug", "WebSocket connected successfully to %s", url); + s.readyState = 1; + if s.onopen then s:onopen(); end + websocket_listeners.onincoming(handler, b); + end); + + return s; +end + +return { + connect = connect; +}; diff --git a/net/websocket/frames.lua b/net/websocket/frames.lua new file mode 100644 index 00000000..fa0e130d --- /dev/null +++ b/net/websocket/frames.lua @@ -0,0 +1,195 @@ +-- Prosody IM +-- Copyright (C) 2012 Florian Zeitz +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- + +local softreq = require "util.dependencies".softreq; +local log = require "util.logger".init "websocket.frames"; +local random_bytes = require "util.random".bytes; + +local bit; +pcall(function() bit = require"bit"; end); +bit = bit or softreq"bit32" +if not bit then log("error", "No bit module found. Either LuaJIT 2, lua-bitop or Lua 5.2 is required"); end +local band = bit.band; +local bor = bit.bor; +local bxor = bit.bxor; +local lshift = bit.lshift; +local rshift = bit.rshift; + +local t_concat = table.concat; +local s_byte = string.byte; +local s_char= string.char; +local s_sub = string.sub; + +local function read_uint16be(str, pos) + local l1, l2 = s_byte(str, pos, pos+1); + return l1*256 + l2; +end +-- FIXME: this may lose precision +local function read_uint64be(str, pos) + local l1, l2, l3, l4, l5, l6, l7, l8 = s_byte(str, pos, pos+7); + return lshift(l1, 56) + lshift(l2, 48) + lshift(l3, 40) + lshift(l4, 32) + + lshift(l5, 24) + lshift(l6, 16) + lshift(l7, 8) + l8; +end +local function pack_uint16be(x) + return s_char(rshift(x, 8), band(x, 0xFF)); +end +local function get_byte(x, n) + return band(rshift(x, n), 0xFF); +end +local function pack_uint64be(x) + return s_char(rshift(x, 56), get_byte(x, 48), get_byte(x, 40), get_byte(x, 32), + get_byte(x, 24), get_byte(x, 16), get_byte(x, 8), band(x, 0xFF)); +end + +local function parse_frame_header(frame) + if #frame < 2 then return; end + + local byte1, byte2 = s_byte(frame, 1, 2); + local result = { + FIN = band(byte1, 0x80) > 0; + RSV1 = band(byte1, 0x40) > 0; + RSV2 = band(byte1, 0x20) > 0; + RSV3 = band(byte1, 0x10) > 0; + opcode = band(byte1, 0x0F); + + MASK = band(byte2, 0x80) > 0; + length = band(byte2, 0x7F); + }; + + local length_bytes = 0; + if result.length == 126 then + length_bytes = 2; + elseif result.length == 127 then + length_bytes = 8; + end + + local header_length = 2 + length_bytes + (result.MASK and 4 or 0); + if #frame < header_length then return; end + + if length_bytes == 2 then + result.length = read_uint16be(frame, 3); + elseif length_bytes == 8 then + result.length = read_uint64be(frame, 3); + end + + if result.MASK then + result.key = { s_byte(frame, length_bytes+3, length_bytes+6) }; + end + + return result, header_length; +end + +-- XORs the string `str` with the array of bytes `key` +-- TODO: optimize +local function apply_mask(str, key, from, to) + from = from or 1 + if from < 0 then from = #str + from + 1 end -- negative indicies + to = to or #str + if to < 0 then to = #str + to + 1 end -- negative indicies + local key_len = #key + local counter = 0; + local data = {}; + for i = from, to do + local key_index = counter%key_len + 1; + counter = counter + 1; + data[counter] = s_char(bxor(key[key_index], s_byte(str, i))); + end + return t_concat(data); +end + +local function parse_frame_body(frame, header, pos) + if header.MASK then + return apply_mask(frame, header.key, pos, pos + header.length - 1); + else + return frame:sub(pos, pos + header.length - 1); + end +end + +local function parse_frame(frame) + local result, pos = parse_frame_header(frame); + if result == nil or #frame < (pos + result.length) then return; end + result.data = parse_frame_body(frame, result, pos+1); + return result, pos + result.length; +end + +local function build_frame(desc) + local data = desc.data or ""; + + assert(desc.opcode and desc.opcode >= 0 and desc.opcode <= 0xF, "Invalid WebSocket opcode"); + if desc.opcode >= 0x8 then + -- RFC 6455 5.5 + assert(#data <= 125, "WebSocket control frames MUST have a payload length of 125 bytes or less."); + end + + local b1 = bor(desc.opcode, + desc.FIN and 0x80 or 0, + desc.RSV1 and 0x40 or 0, + desc.RSV2 and 0x20 or 0, + desc.RSV3 and 0x10 or 0); + + local b2 = #data; + local length_extra; + if b2 <= 125 then -- 7-bit length + length_extra = ""; + elseif b2 <= 0xFFFF then -- 2-byte length + b2 = 126; + length_extra = pack_uint16be(#data); + else -- 8-byte length + b2 = 127; + length_extra = pack_uint64be(#data); + end + + local key = "" + if desc.MASK then + local key_a = desc.key + if key_a then + key = s_char(unpack(key_a, 1, 4)); + else + key = random_bytes(4); + key_a = {key:byte(1,4)}; + end + b2 = bor(b2, 0x80); + data = apply_mask(data, key_a); + end + + return s_char(b1, b2) .. length_extra .. key .. data +end + +local function parse_close(data) + local code, message + if #data >= 2 then + code = read_uint16be(data, 1); + if #data > 2 then + message = s_sub(data, 3); + end + end + return code, message +end + +local function build_close(code, message, mask) + local data = pack_uint16be(code); + if message then + assert(#message<=123, "Close reason must be <=123 bytes"); + data = data .. message; + end + return build_frame({ + opcode = 0x8; + FIN = true; + MASK = mask; + data = data; + }); +end + +return { + parse_header = parse_frame_header; + parse_body = parse_frame_body; + parse = parse_frame; + build = build_frame; + parse_close = parse_close; + build_close = build_close; +}; |