diff options
Diffstat (limited to 'net/server_event.lua')
-rw-r--r-- | net/server_event.lua | 251 |
1 files changed, 118 insertions, 133 deletions
diff --git a/net/server_event.lua b/net/server_event.lua index 122d80fc..59217a0c 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -6,7 +6,6 @@ notes: -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE -- you cant even register a new EV_READ/EV_WRITE callback inside another one - -- never call eventcallback:close( ) from inside eventcallback -- to do some of the above, use timeout events or something what will called from outside -- dont let garbagecollect eventcallbacks, as long they are running -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing @@ -24,6 +23,7 @@ local cfg = { HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) + ACCEPT_QUEUE = 128, -- might influence the length of the pending sockets queue ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept READ_TIMEOUT = 60 * 60 * 6, -- timeout in seconds for read data from socket WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket @@ -33,8 +33,6 @@ local cfg = { } local function use(x) return rawget(_G, x); end -local print = use "print" -local pcall = use "pcall" local ipairs = use "ipairs" local string = use "string" local select = use "select" @@ -43,6 +41,9 @@ local tostring = use "tostring" local coroutine = use "coroutine" local setmetatable = use "setmetatable" +local t_insert = table.insert +local t_concat = table.concat + local ssl = use "ssl" local socket = use "socket" or require "socket" @@ -114,26 +115,19 @@ end )( ) local interface_mt do interface_mt = {}; interface_mt.__index = interface_mt; - + local addevent = base.addevent local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - local string_len = string.len - + -- Private methods function interface_mt:_position(new_position) self.position = new_position or self.position return self.position; end - function interface_mt:_close() -- regs event to start self:_destroy() - local callback = function( ) - self:_destroy(); - self.eventclose = nil - return -1 - end - self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - return true + function interface_mt:_close() + return self:_destroy(); end - + function interface_mt:_start_connection(plainssl) -- should be called from addclient local callback = function( event ) if EV_TIMEOUT == event then -- timeout during connection @@ -143,7 +137,7 @@ do debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) else if plainssl and ssl then -- start ssl session - self:starttls(nil, true) + self:starttls(self._sslctx, true) else -- normal connection self:_start_session(true) end @@ -212,7 +206,6 @@ do self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed self.send = self.conn.send -- caching table lookups with new client object self.receive = self.conn.receive - local onsomething if not call_onconnect then -- trigger listener self:onstatus("ssl-handshake-complete"); end @@ -221,12 +214,12 @@ do self.eventhandshake = nil return -1 end - debug( "error during ssl handshake:", err ) if err == "wantwrite" then event = EV_WRITE elseif err == "wantread" then event = EV_READ else + debug( "ssl handshake error:", err ) self.fatalerror = err end end @@ -249,10 +242,10 @@ do return true end function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id ) + debug( "closing client with id:", self.id, self.fatalerror ) self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventread and self.eventread:close( ) if self.type == "client" then _ = self.eventwrite and self.eventwrite:close( ) _ = self.eventhandshake and self.eventhandshake:close( ) @@ -262,7 +255,7 @@ do _ = self.eventwritetimeout and self.eventwritetimeout:close( ) _ = self.eventreadtimeout and self.eventreadtimeout:close( ) _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! + _ = self.conn and self.conn:close( ) -- close connection _ = self._server and self._server:counter(-1); self.eventread, self.eventwrite = nil, nil self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil @@ -275,12 +268,12 @@ do interfacelist( "delete", self ) return true end - + function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting return nointerface, noreading, nowriting end - + --TODO: Deprecate function interface_mt:lock_read(switch) if switch then @@ -295,7 +288,10 @@ do end function interface_mt:resume() - return self:_lock(self.nointerface, false, self.nowriting); + self:_lock(self.nointerface, false, self.nowriting); + if not self.eventread then + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + end end function interface_mt:counter(c) @@ -304,20 +300,20 @@ do end return self._connections end - + -- Public methods function interface_mt:write(data) if self.nowriting then return nil, "locked" end --vdebug( "try to send data to client, id/data:", self.id, data ) data = tostring( data ) - local len = string_len( data ) + local len = #data local total = len + self.writebufferlen if total > cfg.MAX_SEND_LENGTH then -- check buffer length local err = "send buffer exceeded" debug( "error:", err ) -- to much, check your app return nil, err end - self.writebuffer = self.writebuffer .. data -- new buffer + t_insert(self.writebuffer, data) -- new buffer self.writebufferlen = total if not self.eventwrite then -- register new write event --vdebug( "register new write event" ) @@ -325,62 +321,49 @@ do end return true end - function interface_mt:close(now) + function interface_mt:close() if self.nointerface then return nil, "locked"; end debug( "try to close client connection with id:", self.id ) if self.type == "client" then self.fatalerror = "client to close" - if ( not self.eventwrite ) or now then -- try to close immediately - self:_lock( true, true, true ) - self:_close() - return true - else -- wait for incomplete write request + if self.eventwrite then -- wait for incomplete write request self:_lock( true, true, false ) debug "closing delayed until writebuffer is empty" return nil, "writebuffer not empty, waiting" + else -- close now + self:_lock( true, true, true ) + self:_close() + return true end else - debug( "try to close server with id:", self.id, "args:", now ) + debug( "try to close server with id:", tostring(self.id)) self.fatalerror = "server to close" self:_lock( true ) - local count = 0 - for _, item in ipairs( interfacelist( ) ) do - if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match - if item:close( now ) then -- writebuffer was empty - count = count + 1 - end - end - end - local timeout = 0 -- dont wait for unfinished writebuffers of clients... - if not now then - timeout = cfg.WRITE_TIMEOUT -- ...or wait for it - end - self:_close( timeout ) -- add new event to remove the server interface - debug( "seconds remained until server is closed:", timeout ) - return count -- returns finished clients with empty writebuffer + self:_close( 0 ) + return true end end - + function interface_mt:socket() return self.conn end - + function interface_mt:server() return self._server or self; end - + function interface_mt:port() return self._port end - + function interface_mt:serverport() return self._serverport end - + function interface_mt:ip() return self._ip end - + function interface_mt:ssl() return self._usingssl end @@ -388,15 +371,15 @@ do function interface_mt:type() return self._type or "client" end - + function interface_mt:connections() return self._connections end - + function interface_mt:address() return self.addr end - + function interface_mt:set_sslctx(sslctx) self._sslctx = sslctx; if sslctx then @@ -412,11 +395,11 @@ do end return self._pattern; end - + function interface_mt:set_send(new_send) -- No-op, we always use the underlying connection's send end - + function interface_mt:starttls(sslctx, call_onconnect) debug( "try to start ssl at client id:", self.id ) local err @@ -445,22 +428,22 @@ do self.starttls = false; return true end - + function interface_mt:setoption(option, value) if self.conn.setoption then return self.conn:setoption(option, value); end return false, "setoption not implemented"; end - + function interface_mt:setlistener(listener) - self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus - = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus; + self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus + = listener.onconnect, listener.ondisconnect, listener.onincoming, + listener.ontimeout, listener.onreadtimeout, listener.onstatus; end - + -- Stub handlers function interface_mt:onconnect() - return self:onincoming(nil); end function interface_mt:onincoming() end @@ -468,6 +451,12 @@ do end function interface_mt:ontimeout() end + function interface_mt:onreadtimeout() + self.fatalerror = "timeout during receiving" + debug( "connection failed:", self.fatalerror ) + self:_close() + self.eventread = nil + end function interface_mt:ondrain() end function interface_mt:onstatus() @@ -479,18 +468,15 @@ end local handleclient; do local string_sub = string.sub -- caching table lookups - local string_len = string.len local addevent = base.addevent - local coroutine_wrap = coroutine.wrap local socket_gettime = socket.gettime - local coroutine_yield = coroutine.yield - function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface --vdebug("creating client interfacce...") local interface = { type = "client"; conn = client; currenttime = socket_gettime( ); -- safe the origin - writebuffer = ""; -- writebuffer + writebuffer = {}; -- writebuffer writebufferlen = 0; -- length of writebuffer send = client.send; -- caching table lookups receive = client.receive; @@ -498,6 +484,8 @@ do ondisconnect = listener.ondisconnect; -- will be called when client disconnects onincoming = listener.onincoming; -- will be called when client sends data ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs + ondrain = listener.ondrain; -- called when writebuffer is empty onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) eventread = false, eventwrite = false, eventclose = false, eventhandshake = false, eventstarthandshake = false; -- event handler @@ -511,7 +499,7 @@ do noreading = false, nowriting = false; -- locks of the read/writecallback startsslcallback = false; -- starting handshake callback position = false; -- position of client in interfacelist - + -- Properties _ip = ip, _port = port, _server = server, _pattern = pattern, _serverport = (server and server:port() or nil), @@ -544,10 +532,11 @@ do interface.eventwritetimeout = false end end - local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) + interface.writebuffer = { t_concat(interface.writebuffer) } + local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) if succ then -- writing succesful - interface.writebuffer = "" + interface.writebuffer[1] = nil interface.writebufferlen = 0 interface:ondrain(); if interface.fatalerror then @@ -563,7 +552,7 @@ do return -1 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer + interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer interface.writebufferlen = interface.writebufferlen - byte if "wantread" == err then -- happens only with luasec local callback = function( ) @@ -586,7 +575,7 @@ do end end end - + interface.readcallback = function( event ) -- called on read events --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) if interface.noreading or interface.fatalerror then -- leave this event @@ -594,57 +583,56 @@ do interface.eventread = nil return -1 end - if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect - interface.fatalerror = "timeout during receiving" - debug( "connection failed:", interface.fatalerror ) + if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then + return -1 -- took too long to get some data from client -> disconnect + end + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", tostring(ret) ) + end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil + end + end + local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 - else -- can read - if interface._usingssl then -- handle luasec - if interface.eventwritetimeout then -- ok, in the past writecallback was regged - local ret = interface.writecallback( ) -- call it - --vdebug( "tried to write in readcallback, result:", tostring(ret) ) - end - if interface.eventreadtimeout then - interface.eventreadtimeout:close( ) - interface.eventreadtimeout = nil + end + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) end - end - local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" - --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part or "" - local len = string_len( buffer ) - if len > cfg.MAX_READ_LENGTH then -- check buffer length - interface.fatalerror = "receive buffer exceeded" - debug( "fatal error:", interface.fatalerror ) + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) + interface:_close() + end, cfg.READ_TIMEOUT + ) + debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 end + else interface.onincoming( interface, buffer, err ) -- send new data to listener - if err and ( err ~= "timeout" and err ~= "wantread" ) then - if "wantwrite" == err then -- need to read on write event - if not interface.eventwrite then -- register new write event if needed - interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) - end - interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, - function( ) - interface:_close() - end, cfg.READ_TIMEOUT - ) - debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) - -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... - else -- connection was closed or fatal error - interface.fatalerror = err - debug( "connection failed in read event:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - end - end - return EV_READ, cfg.READ_TIMEOUT end + if interface.noreading then + interface.eventread = nil; + return -1; + end + return EV_READ, cfg.READ_TIMEOUT end client:settimeout( 0 ) -- set non blocking @@ -660,7 +648,7 @@ do debug "creating server interface..." local interface = { _connections = 0; - + conn = server; onconnect = listener.onconnect; -- will be called when new client connected eventread = false; -- read event handler @@ -668,7 +656,7 @@ do readcallback = false; -- read event callback fatalerror = false; -- error message nointerface = true; -- lock/unlock parameter - + _ip = addr, _port = port, _pattern = pattern, _sslctx = sslctx; } @@ -699,7 +687,7 @@ do end local client_ip, client_port = client:getpeername( ) interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx ) + local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) --vdebug( "client id:", clientinterface, "startssl:", startssl ) if ssl and sslctx then clientinterface:starttls(sslctx, true) @@ -707,12 +695,12 @@ do clientinterface:_start_session( true ) end debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); - + client, err = server:accept() -- try to accept again end return EV_READ end - + server:settimeout( 0 ) setmetatable(interface, interface_mt) interfacelist( "add", interface ) @@ -726,7 +714,7 @@ local addserver = ( function( ) --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket if not server then - debug( "creating server socket failed because:", err ) + debug( "creating server socket on "..addr.." port "..port.." failed:", err ) return nil, err end local sslctx @@ -749,13 +737,13 @@ end )( ) local addclient, wrapclient do - function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl ) + function wrapclient( client, ip, port, listeners, pattern, sslctx ) local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_session() + interface:_start_connection(sslctx) return interface, client --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface end - + function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) local client, err = socket.tcp() -- creating new socket if not client then @@ -785,9 +773,6 @@ do local res, err = client:connect( addr, serverport ) -- connect if res or ( err == "timeout" ) then local ip, port = client:getsockname( ) - local server = function( ) - return nil, "this is a dummy server interface" - end local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl ) interface:_start_connection( startssl ) debug( "new connection id:", interface.id ) @@ -828,14 +813,14 @@ local function setquitting(yes) end end -function get_backend() +local function get_backend() return base:method(); end -- We need to hold onto the events to stop them -- being garbage-collected local signal_events = {}; -- [signal_num] -> event object -function hook_signal(signal_num, handler) +local function hook_signal(signal_num, handler) local function _handler(event) local ret = handler(); if ret ~= false then -- Continue handling this signal? @@ -849,14 +834,14 @@ end local function link(sender, receiver, buffersize) local sender_locked; - + function receiver:ondrain() if sender_locked then sender:resume(); sender_locked = nil; end end - + function sender:onincoming(data) receiver:write(data); if receiver.writebufferlen >= buffersize then |