diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/cqueues.lua | 74 | ||||
-rw-r--r-- | net/server.lua | 86 | ||||
-rw-r--r-- | net/server_event.lua | 1072 | ||||
-rw-r--r-- | net/server_select.lua | 99 |
4 files changed, 729 insertions, 602 deletions
diff --git a/net/cqueues.lua b/net/cqueues.lua new file mode 100644 index 00000000..8c4c756f --- /dev/null +++ b/net/cqueues.lua @@ -0,0 +1,74 @@ +-- Prosody IM +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- +-- This module allows you to use cqueues with a net.server mainloop +-- + +local server = require "net.server"; +local cqueues = require "cqueues"; +assert(cqueues.VERSION >= 20150113, "cqueues newer than 20150113 required") + +-- Create a single top level cqueue +local cq; + +if server.cq then -- server provides cqueues object + cq = server.cq; +elseif server.get_backend() == "select" and server._addtimer then -- server_select + cq = cqueues.new(); + local function step() + assert(cq:loop(0)); + end + + -- Use wrapclient (as wrapconnection isn't exported) to get server_select to watch cq fd + local handler = server.wrapclient({ + getfd = function() return cq:pollfd(); end; + settimeout = function() end; -- Method just needs to exist + close = function() end; -- Need close method for 'closeall' + }, nil, nil, {}); + + -- Only need to listen for readable; cqueues handles everything under the hood + -- readbuffer is called when `select` notes an fd as readable + handler.readbuffer = step; + + -- Use server_select low lever timer facility, + -- this callback gets called *every* time there is a timeout in the main loop + server._addtimer(function(current_time) + -- This may end up in extra step()'s, but cqueues handles it for us. + step(); + return cq:timeout(); + end); +elseif server.event and server.base then -- server_event + cq = cqueues.new(); + -- Only need to listen for readable; cqueues handles everything under the hood + local EV_READ = server.event.EV_READ; + -- Convert a cqueues timeout to an acceptable timeout for luaevent + local function luaevent_safe_timeout(cq) + local t = cq:timeout(); + -- if you give luaevent 0 or nil, it re-uses the previous timeout. + if t == 0 then + t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`) + elseif t == nil then -- pick something big if we don't have one + t = 0x7FFFFFFF; -- largest 32bit int + end + return t + end + local event_handle; + event_handle = server.base:addevent(cq:pollfd(), EV_READ, function(e) + -- Need to reference event_handle or this callback will get collected + -- This creates a circular reference that can only be broken if event_handle is manually :close()'d + local _ = event_handle; + -- Run as many cqueues things as possible (with a timeout of 0) + -- If an error is thrown, it will break the libevent loop; but prosody resumes after logging a top level error + assert(cq:loop(0)); + return EV_READ, luaevent_safe_timeout(cq); + end, luaevent_safe_timeout(cq)); +else + error "NYI" +end + +return { + cq = cq; +} diff --git a/net/server.lua b/net/server.lua index 41e180fa..0e13399d 100644 --- a/net/server.lua +++ b/net/server.lua @@ -6,49 +6,28 @@ -- COPYING file in the source package for more information. -- -local use_luaevent = prosody and require "core.configmanager".get("*", "use_libevent"); +local server_type = prosody and require "core.configmanager".get("*", "network_backend") or "select"; +if prosody and require "core.configmanager".get("*", "use_libevent") then + server_type = "event"; +end -if use_luaevent then - use_luaevent = pcall(require, "luaevent.core"); - if not use_luaevent then +if server_type == "event" then + if not pcall(require, "luaevent.core") then log("error", "libevent not found, falling back to select()"); + server_type = "select" end end local server; - -if use_luaevent then +local set_config; +if server_type == "event" then server = require "net.server_event"; - -- Overwrite signal.signal() because we need to ask libevent to - -- handle them instead - local ok, signal = pcall(require, "util.signal"); - if ok and signal then - local _signal_signal = signal.signal; - function signal.signal(signal_id, handler) - if type(signal_id) == "string" then - signal_id = signal[signal_id:upper()]; - end - if type(signal_id) ~= "number" then - return false, "invalid-signal"; - end - return server.hook_signal(signal_id, handler); - end - end -else - use_luaevent = false; - server = require "net.server_select"; -end - -if prosody then - local config_get = require "core.configmanager".get; local defaults = {}; - for k,v in pairs(server.cfg or server.getsettings()) do + for k,v in pairs(server.cfg) do defaults[k] = v; end - local function load_config() - local settings = config_get("*", "network_settings") or {}; - if use_luaevent then + function set_config(settings) local event_settings = { ACCEPT_DELAY = settings.accept_retry_interval; ACCEPT_QUEUE = settings.tcp_backlog; @@ -67,13 +46,54 @@ if prosody then for k,default in pairs(defaults) do server.cfg[k] = event_settings[k] or default; end - else + end +elseif server_type == "select" then + server = require "net.server_select"; + + local defaults = {}; + for k,v in pairs(server.getsettings()) do + defaults[k] = v; + end + function set_config(settings) local select_settings = {}; for k,default in pairs(defaults) do select_settings[k] = settings[k] or default; end server.changesettings(select_settings); end +else + error("Unsupported server type") +end + +-- If server.hook_signal exists, replace signal.signal() +local has_signal, signal = pcall(require, "util.signal"); +if has_signal then + if server.hook_signal then + function signal.signal(signal_id, handler) + if type(signal_id) == "string" then + signal_id = signal[signal_id:upper()]; + end + if type(signal_id) ~= "number" then + return false, "invalid-signal"; + end + return server.hook_signal(signal_id, handler); + end + else + server.hook_signal = signal.signal; + end +else + if not server.hook_signal then + server.hook_signal = function() + return false, "signal hooking not supported" + end + end +end + +if prosody then + local config_get = require "core.configmanager".get; + local function load_config() + local settings = config_get("*", "network_settings") or {}; + return set_config(settings); end load_config(); prosody.events.add_handler("config-reloaded", load_config); diff --git a/net/server_event.lua b/net/server_event.lua index 70a6dc37..7940a1b8 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -92,648 +92,647 @@ local interfacelist = { } -- Client interface methods local interface_mt = {}; interface_mt.__index = interface_mt; - -- Private methods - function interface_mt:_close() - return self:_destroy(); - end - - function interface_mt:_start_connection(plainssl) -- should be called from addclient - local callback = function( event ) - if EV_TIMEOUT == event then -- timeout during connection - self.fatalerror = "connection timeout" - self:ontimeout() -- call timeout listener - self:_close() - debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) - else +-- Private methods +function interface_mt:_close() + return self:_destroy(); +end + +function interface_mt:_start_connection(plainssl) -- called from wrapclient + local callback = function( event ) + if EV_TIMEOUT == event then -- timeout during connection + self.fatalerror = "connection timeout" + self:ontimeout() -- call timeout listener + self:_close() + debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) + else if plainssl and has_luasec then -- start ssl session - self:starttls(self._sslctx, true) - else -- normal connection - self:_start_session(true) - end - debug( "new connection established. id:", self.id ) - end - self.eventconnect = nil - return -1 + self:starttls(self._sslctx, true) + else -- normal connection + self:_start_session(true) end - self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) - return true + debug( "new connection established. id:", self.id ) + end + self.eventconnect = nil + return -1 end - function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl - if self.type == "client" then - local callback = function( ) - self:_lock( false, false, false ) - --vdebug( "start listening on client socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback - if call_onconnect then - self:onconnect() - end - self.eventsession = nil - return -1 + self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) + return true +end +function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl + if self.type == "client" then + local callback = function( ) + self:_lock( false, false, false ) + --vdebug( "start listening on client socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + if call_onconnect then + self:onconnect() end - self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - else - self:_lock( false ) - --vdebug( "start listening on server socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback + self.eventsession = nil + return -1 end - return true + self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + else + self:_lock( false ) + --vdebug( "start listening on server socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback + end + return true +end +function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first + --vdebug( "starting ssl session with client id:", self.id ) + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventwrite and self.eventwrite:close( ) + self.eventread, self.eventwrite = nil, nil + local err + self.conn, err = ssl.wrap( self.conn, self._sslctx ) + if err then + self.fatalerror = err + self.conn = nil -- cannot be used anymore + if call_onconnect then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "fatal error while ssl wrapping:", err ) + return false end - function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first - --vdebug( "starting ssl session with client id:", self.id ) - local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! - _ = self.eventwrite and self.eventwrite:close( ) - self.eventread, self.eventwrite = nil, nil - local err - self.conn, err = ssl.wrap( self.conn, self._sslctx ) - if err then - self.fatalerror = err - self.conn = nil -- cannot be used anymore + self.conn:settimeout( 0 ) -- set non blocking + local handshakecallback = coroutine_wrap(function( event ) + local _, err + local attempt = 0 + local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS + while attempt < maxattempt do -- no endless loop + attempt = attempt + 1 + debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) + if attempt > maxattempt then + self.fatalerror = "max handshake attempts exceeded" + elseif EV_TIMEOUT == event then + self.fatalerror = "timeout during handshake" + else + _, err = self.conn:dohandshake( ) + if not err then + self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed + self.send = self.conn.send -- caching table lookups with new client object + self.receive = self.conn.receive + if not call_onconnect then -- trigger listener + self:onstatus("ssl-handshake-complete"); + end + self:_start_session( call_onconnect ) + debug( "ssl handshake done" ) + self.eventhandshake = nil + return -1 + end + if err == "wantwrite" then + event = EV_WRITE + elseif err == "wantread" then + event = EV_READ + else + debug( "ssl handshake error:", err ) + self.fatalerror = err + end + end + if self.fatalerror then if call_onconnect then self.ondisconnect = nil -- dont call this when client isnt really connected end self:_close() - debug( "fatal error while ssl wrapping:", err ) - return false + debug( "handshake failed because:", self.fatalerror ) + self.eventhandshake = nil + return -1 end - self.conn:settimeout( 0 ) -- set non blocking - local handshakecallback = coroutine_wrap(function( event ) - local _, err - local attempt = 0 - local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS - while attempt < maxattempt do -- no endless loop - attempt = attempt + 1 - debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) - if attempt > maxattempt then - self.fatalerror = "max handshake attempts exceeded" - elseif EV_TIMEOUT == event then - self.fatalerror = "timeout during handshake" - else - _, err = self.conn:dohandshake( ) - if not err then - self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed - self.send = self.conn.send -- caching table lookups with new client object - self.receive = self.conn.receive - if not call_onconnect then -- trigger listener - self:onstatus("ssl-handshake-complete"); - end - self:_start_session( call_onconnect ) - debug( "ssl handshake done" ) - self.eventhandshake = nil - return -1 - end - if err == "wantwrite" then - event = EV_WRITE - elseif err == "wantread" then - event = EV_READ - else - debug( "ssl handshake error:", err ) - self.fatalerror = err - end - end - if self.fatalerror then - if call_onconnect then - self.ondisconnect = nil -- dont call this when client isnt really connected - end - self:_close() - debug( "handshake failed because:", self.fatalerror ) - self.eventhandshake = nil - return -1 - end - event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... - end - end - ) - debug "starting handshake..." - self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked - self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) - return true + event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... + end end - function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id, self.fatalerror ) - self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions - local _ - _ = self.eventread and self.eventread:close( ) - if self.type == "client" then - _ = self.eventwrite and self.eventwrite:close( ) - _ = self.eventhandshake and self.eventhandshake:close( ) - _ = self.eventstarthandshake and self.eventstarthandshake:close( ) - _ = self.eventconnect and self.eventconnect:close( ) - _ = self.eventsession and self.eventsession:close( ) - _ = self.eventwritetimeout and self.eventwritetimeout:close( ) - _ = self.eventreadtimeout and self.eventreadtimeout:close( ) - _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection - _ = self._server and self._server:counter(-1); - self.eventread, self.eventwrite = nil, nil - self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil - self.readcallback, self.writecallback = nil, nil - else - self.conn:close( ) - self.eventread, self.eventclose = nil, nil - self.interface, self.readcallback = nil, nil - end - interfacelist[ self ] = nil - return true + ) + debug "starting handshake..." + self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked + self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) + return true +end +function interface_mt:_destroy() -- close this interface + events and call last listener + debug( "closing client with id:", self.id, self.fatalerror ) + self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions + local _ + _ = self.eventread and self.eventread:close( ) + if self.type == "client" then + _ = self.eventwrite and self.eventwrite:close( ) + _ = self.eventhandshake and self.eventhandshake:close( ) + _ = self.eventstarthandshake and self.eventstarthandshake:close( ) + _ = self.eventconnect and self.eventconnect:close( ) + _ = self.eventsession and self.eventsession:close( ) + _ = self.eventwritetimeout and self.eventwritetimeout:close( ) + _ = self.eventreadtimeout and self.eventreadtimeout:close( ) + _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) + _ = self.conn and self.conn:close( ) -- close connection + _ = self._server and self._server:counter(-1); + self.eventread, self.eventwrite = nil, nil + self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil + self.readcallback, self.writecallback = nil, nil + else + self.conn:close( ) + self.eventread, self.eventclose = nil, nil + self.interface, self.readcallback = nil, nil end + interfacelist[ self ] = nil + return true +end - function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events - self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting - return nointerface, noreading, nowriting - end +function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events + self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting + return nointerface, noreading, nowriting +end - --TODO: Deprecate - function interface_mt:lock_read(switch) - if switch then - return self:pause(); - else - return self:resume(); - end +--TODO: Deprecate +function interface_mt:lock_read(switch) + if switch then + return self:pause(); + else + return self:resume(); end +end - function interface_mt:pause() - return self:_lock(self.nointerface, true, self.nowriting); - end +function interface_mt:pause() + return self:_lock(self.nointerface, true, self.nowriting); +end - function interface_mt:resume() - self:_lock(self.nointerface, false, self.nowriting); +function interface_mt:resume() + self:_lock(self.nointerface, false, self.nowriting); if self.readcallback and not self.eventread then - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback return true; - end end +end - function interface_mt:counter(c) - if c then - self._connections = self._connections + c - end - return self._connections - end - - -- Public methods - function interface_mt:write(data) - if self.nowriting then return nil, "locked" end - --vdebug( "try to send data to client, id/data:", self.id, data ) - data = tostring( data ) - local len = #data - local total = len + self.writebufferlen - if total > cfg.MAX_SEND_LENGTH then -- check buffer length - local err = "send buffer exceeded" - debug( "error:", err ) -- to much, check your app - return nil, err - end - t_insert(self.writebuffer, data) -- new buffer - self.writebufferlen = total - if not self.eventwrite then -- register new write event - --vdebug( "register new write event" ) - self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) - end - return true +function interface_mt:counter(c) + if c then + self._connections = self._connections + c end - function interface_mt:close() - if self.nointerface then return nil, "locked"; end - debug( "try to close client connection with id:", self.id ) - if self.type == "client" then - self.fatalerror = "client to close" - if self.eventwrite then -- wait for incomplete write request - self:_lock( true, true, false ) - debug "closing delayed until writebuffer is empty" - return nil, "writebuffer not empty, waiting" - else -- close now - self:_lock( true, true, true ) - self:_close() - return true - end - else - debug( "try to close server with id:", tostring(self.id)) - self.fatalerror = "server to close" - self:_lock( true ) - self:_close( 0 ) + return self._connections +end + +-- Public methods +function interface_mt:write(data) + if self.nowriting then return nil, "locked" end + --vdebug( "try to send data to client, id/data:", self.id, data ) + data = tostring( data ) + local len = #data + local total = len + self.writebufferlen + if total > cfg.MAX_SEND_LENGTH then -- check buffer length + local err = "send buffer exceeded" + debug( "error:", err ) -- to much, check your app + return nil, err + end + t_insert(self.writebuffer, data) -- new buffer + self.writebufferlen = total + if not self.eventwrite then -- register new write event + --vdebug( "register new write event" ) + self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) + end + return true +end +function interface_mt:close() + if self.nointerface then return nil, "locked"; end + debug( "try to close client connection with id:", self.id ) + if self.type == "client" then + self.fatalerror = "client to close" + if self.eventwrite then -- wait for incomplete write request + self:_lock( true, true, false ) + debug "closing delayed until writebuffer is empty" + return nil, "writebuffer not empty, waiting" + else -- close now + self:_lock( true, true, true ) + self:_close() return true end + else + debug( "try to close server with id:", tostring(self.id)) + self.fatalerror = "server to close" + self:_lock( true ) + self:_close( 0 ) + return true end +end - function interface_mt:socket() - return self.conn - end +function interface_mt:socket() + return self.conn +end - function interface_mt:server() - return self._server or self; - end +function interface_mt:server() + return self._server or self; +end - function interface_mt:port() - return self._port - end +function interface_mt:port() + return self._port +end - function interface_mt:serverport() - return self._serverport - end +function interface_mt:serverport() + return self._serverport +end - function interface_mt:ip() - return self._ip - end +function interface_mt:ip() + return self._ip +end - function interface_mt:ssl() - return self._usingssl - end - interface_mt.clientport = interface_mt.port -- COMPAT server_select +function interface_mt:ssl() + return self._usingssl +end +interface_mt.clientport = interface_mt.port -- COMPAT server_select - function interface_mt:type() - return self._type or "client" - end +function interface_mt:type() + return self._type or "client" +end - function interface_mt:connections() - return self._connections - end +function interface_mt:connections() + return self._connections +end - function interface_mt:address() - return self.addr - end +function interface_mt:address() + return self.addr +end - function interface_mt:set_sslctx(sslctx) - self._sslctx = sslctx; - if sslctx then - self.starttls = nil; -- use starttls() of interface_mt - else - self.starttls = false; -- prevent starttls() - end +function interface_mt:set_sslctx(sslctx) + self._sslctx = sslctx; + if sslctx then + self.starttls = nil; -- use starttls() of interface_mt + else + self.starttls = false; -- prevent starttls() end +end - function interface_mt:set_mode(pattern) - if pattern then - self._pattern = pattern; - end - return self._pattern; +function interface_mt:set_mode(pattern) + if pattern then + self._pattern = pattern; end + return self._pattern; +end function interface_mt:set_send(new_send) -- luacheck: ignore 212 - -- No-op, we always use the underlying connection's send - end + -- No-op, we always use the underlying connection's send +end - function interface_mt:starttls(sslctx, call_onconnect) - debug( "try to start ssl at client id:", self.id ) - local err - self._sslctx = sslctx; - if self._usingssl then -- startssl was already called - err = "ssl already active" - end - if err then - debug( "error:", err ) - return nil, err - end - self._usingssl = true - self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event - self.startsslcallback = nil - self:_start_ssl(call_onconnect); - self.eventstarthandshake = nil - return -1 - end - if not self.eventwrite then - self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake - self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake - else -- wait until writebuffer is empty - self:_lock( true, true, false ) - debug "ssl session delayed until writebuffer is empty..." - end - self.starttls = false; - return true - end +function interface_mt:starttls(sslctx, call_onconnect) + debug( "try to start ssl at client id:", self.id ) + local err + self._sslctx = sslctx; + if self._usingssl then -- startssl was already called + err = "ssl already active" + end + if err then + debug( "error:", err ) + return nil, err + end + self._usingssl = true + self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event + self.startsslcallback = nil + self:_start_ssl(call_onconnect); + self.eventstarthandshake = nil + return -1 + end + if not self.eventwrite then + self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake + self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake + else -- wait until writebuffer is empty + self:_lock( true, true, false ) + debug "ssl session delayed until writebuffer is empty..." + end + self.starttls = false; + return true +end - function interface_mt:setoption(option, value) - if self.conn.setoption then - return self.conn:setoption(option, value); - end - return false, "setoption not implemented"; +function interface_mt:setoption(option, value) + if self.conn.setoption then + return self.conn:setoption(option, value); end + return false, "setoption not implemented"; +end - function interface_mt:setlistener(listener) - self:ondetach(); -- Notify listener that it is no longer responsible for this connection +function interface_mt:setlistener(listener) + self:ondetach(); -- Notify listener that it is no longer responsible for this connection self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus, self.ondetach = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onreadtimeout, listener.onstatus, listener.ondetach; - end +end - -- Stub handlers - function interface_mt:onconnect() - end - function interface_mt:onincoming() - end - function interface_mt:ondisconnect() - end - function interface_mt:ontimeout() - end +-- Stub handlers +function interface_mt:onconnect() +end +function interface_mt:onincoming() +end +function interface_mt:ondisconnect() +end +function interface_mt:ontimeout() +end function interface_mt:onreadtimeout() self.fatalerror = "timeout during receiving" debug( "connection failed:", self.fatalerror ) self:_close() self.eventread = nil end - function interface_mt:ondrain() - end - function interface_mt:ondetach() - end - function interface_mt:onstatus() - end +function interface_mt:ondrain() +end +function interface_mt:ondetach() +end +function interface_mt:onstatus() +end -- End of client interface methods local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface - --vdebug("creating client interfacce...") - local interface = { - type = "client"; - conn = client; - currenttime = socket_gettime( ); -- safe the origin - writebuffer = {}; -- writebuffer - writebufferlen = 0; -- length of writebuffer - send = client.send; -- caching table lookups - receive = client.receive; - onconnect = listener.onconnect; -- will be called when client disconnects - ondisconnect = listener.ondisconnect; -- will be called when client disconnects - onincoming = listener.onincoming; -- will be called when client sends data - ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + --vdebug("creating client interfacce...") + local interface = { + type = "client"; + conn = client; + currenttime = socket_gettime( ); -- safe the origin + writebuffer = {}; -- writebuffer + writebufferlen = 0; -- length of writebuffer + send = client.send; -- caching table lookups + receive = client.receive; + onconnect = listener.onconnect; -- will be called when client disconnects + ondisconnect = listener.ondisconnect; -- will be called when client disconnects + onincoming = listener.onincoming; -- will be called when client sends data + ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs - ondrain = listener.ondrain; -- called when writebuffer is empty - ondetach = listener.ondetach; -- called when disassociating this listener from this connection - onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) - eventread = false, eventwrite = false, eventclose = false, - eventhandshake = false, eventstarthandshake = false; -- event handler - eventconnect = false, eventsession = false; -- more event handler... - eventwritetimeout = false; -- even more event handler... - eventreadtimeout = false; - fatalerror = false; -- error message - writecallback = false; -- will be called on write events - readcallback = false; -- will be called on read events - nointerface = true; -- lock/unlock parameter of this interface - noreading = false, nowriting = false; -- locks of the read/writecallback - startsslcallback = false; -- starting handshake callback - position = false; -- position of client in interfacelist - - -- Properties - _ip = ip, _port = port, _server = server, _pattern = pattern, - _serverport = (server and server:port() or nil), - _sslctx = sslctx; -- parameters - _usingssl = false; -- client is using ssl; - } + ondrain = listener.ondrain; -- called when writebuffer is empty + ondetach = listener.ondetach; -- called when disassociating this listener from this connection + onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) + eventread = false, eventwrite = false, eventclose = false, + eventhandshake = false, eventstarthandshake = false; -- event handler + eventconnect = false, eventsession = false; -- more event handler... + eventwritetimeout = false; -- even more event handler... + eventreadtimeout = false; + fatalerror = false; -- error message + writecallback = false; -- will be called on write events + readcallback = false; -- will be called on read events + nointerface = true; -- lock/unlock parameter of this interface + noreading = false, nowriting = false; -- locks of the read/writecallback + startsslcallback = false; -- starting handshake callback + position = false; -- position of client in interfacelist + + -- Properties + _ip = ip, _port = port, _server = server, _pattern = pattern, + _serverport = (server and server:port() or nil), + _sslctx = sslctx; -- parameters + _usingssl = false; -- client is using ssl; + } if not has_luasec then interface.starttls = false; end - interface.id = tostring(interface):match("%x+$"); - interface.writecallback = function( event ) -- called on write events - --vdebug( "new client write event, id/ip/port:", interface, ip, port ) - if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event - --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) - interface.eventwrite = false - return -1 + interface.id = tostring(interface):match("%x+$"); + interface.writecallback = function( event ) -- called on write events + --vdebug( "new client write event, id/ip/port:", interface, ip, port ) + if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event + --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) + interface.eventwrite = false + return -1 + end + if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect + interface.fatalerror = "timeout during writing" + debug( "writing failed:", interface.fatalerror ) + interface:_close() + interface.eventwrite = false + return -1 + else -- can write :) + if interface._usingssl then -- handle luasec + if interface.eventreadtimeout then -- we have to read first + local ret = interface.readcallback( ) -- call readcallback + --vdebug( "tried to read in writecallback, result:", ret ) + end + if interface.eventwritetimeout then -- luasec only + interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error + interface.eventwritetimeout = false + end end - if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect - interface.fatalerror = "timeout during writing" - debug( "writing failed:", interface.fatalerror ) - interface:_close() - interface.eventwrite = false - return -1 - else -- can write :) - if interface._usingssl then -- handle luasec - if interface.eventreadtimeout then -- we have to read first - local ret = interface.readcallback( ) -- call readcallback - --vdebug( "tried to read in writecallback, result:", ret ) - end - if interface.eventwritetimeout then -- luasec only - interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error - interface.eventwritetimeout = false - end + interface.writebuffer = { t_concat(interface.writebuffer) } + local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) + --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) + if succ then -- writing succesful + interface.writebuffer[1] = nil + interface.writebufferlen = 0 + interface:ondrain(); + if interface.fatalerror then + debug "closing client after writing" + interface:_close() -- close interface if needed + elseif interface.startsslcallback then -- start ssl connection if needed + debug "starting ssl handshake after writing" + interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) + elseif interface.eventreadtimeout then + return EV_WRITE, EV_TIMEOUT end - interface.writebuffer = { t_concat(interface.writebuffer) } - local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) - --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) - if succ then -- writing succesful - interface.writebuffer[1] = nil - interface.writebufferlen = 0 - interface:ondrain(); - if interface.fatalerror then - debug "closing client after writing" - interface:_close() -- close interface if needed - elseif interface.startsslcallback then -- start ssl connection if needed - debug "starting ssl handshake after writing" - interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) - elseif interface.eventreadtimeout then - return EV_WRITE, EV_TIMEOUT - end - interface.eventwrite = nil - return -1 - elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again - --vdebug( "writebuffer is not empty:", err ) + interface.eventwrite = nil + return -1 + elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again + --vdebug( "writebuffer is not empty:", err ) interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer - interface.writebufferlen = interface.writebufferlen - byte - if "wantread" == err then -- happens only with luasec - local callback = function( ) - interface:_close() - interface.eventwritetimeout = nil - return -1; - end - interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event - debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) - -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent - return -1 + interface.writebufferlen = interface.writebufferlen - byte + if "wantread" == err then -- happens only with luasec + local callback = function( ) + interface:_close() + interface.eventwritetimeout = nil + return -1; end - return EV_WRITE, cfg.WRITE_TIMEOUT - else -- connection was closed during writing or fatal error - interface.fatalerror = err or "fatal error" - debug( "connection failed in write event:", interface.fatalerror ) - interface:_close() - interface.eventwrite = nil + interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event + debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) + -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent return -1 end + return EV_WRITE, cfg.WRITE_TIMEOUT + else -- connection was closed during writing or fatal error + interface.fatalerror = err or "fatal error" + debug( "connection failed in write event:", interface.fatalerror ) + interface:_close() + interface.eventwrite = nil + return -1 end end + end - interface.readcallback = function( event ) -- called on read events - --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) - if interface.noreading or interface.fatalerror then -- leave this event - --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) - interface.eventread = nil - return -1 - end + interface.readcallback = function( event ) -- called on read events + --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) + if interface.noreading or interface.fatalerror then -- leave this event + --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) + interface.eventread = nil + return -1 + end if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then return -1 -- took too long to get some data from client -> disconnect end - if interface._usingssl then -- handle luasec - if interface.eventwritetimeout then -- ok, in the past writecallback was regged - local ret = interface.writecallback( ) -- call it - --vdebug( "tried to write in readcallback, result:", tostring(ret) ) - end - if interface.eventreadtimeout then - interface.eventreadtimeout:close( ) - interface.eventreadtimeout = nil - end - end - local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" - --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part - if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length - interface.fatalerror = "receive buffer exceeded" - debug( "fatal error:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", tostring(ret) ) + end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil + end + end + local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) end - if err and ( err ~= "timeout" and err ~= "wantread" ) then - if "wantwrite" == err then -- need to read on write event - if not interface.eventwrite then -- register new write event if needed - interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) - end - interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, - function( ) - interface:_close() - end, cfg.READ_TIMEOUT - ) - debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) - -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... - else -- connection was closed or fatal error - interface.fatalerror = err - debug( "connection failed in read event:", interface.fatalerror ) + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) interface:_close() - interface.eventread = nil - return -1 - end - else - interface.onincoming( interface, buffer, err ) -- send new data to listener - end - if interface.noreading then - interface.eventread = nil; - return -1; - end - return EV_READ, cfg.READ_TIMEOUT + end, cfg.READ_TIMEOUT + ) + debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 end + else + interface.onincoming( interface, buffer, err ) -- send new data to listener + end + if interface.noreading then + interface.eventread = nil; + return -1; + end + return EV_READ, cfg.READ_TIMEOUT + end - client:settimeout( 0 ) -- set non blocking - setmetatable(interface, interface_mt) + client:settimeout( 0 ) -- set non blocking + setmetatable(interface, interface_mt) interfacelist[ interface ] = true -- add to interfacelist - return interface - end + return interface +end local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface - debug "creating server interface..." - local interface = { - _connections = 0; + debug "creating server interface..." + local interface = { + _connections = 0; type = "server"; - conn = server; - onconnect = listener.onconnect; -- will be called when new client connected - eventread = false; -- read event handler - eventclose = false; -- close event handler - readcallback = false; -- read event callback - fatalerror = false; -- error message - nointerface = true; -- lock/unlock parameter - - _ip = addr, _port = port, _pattern = pattern, - _sslctx = sslctx; - } - interface.id = tostring(interface):match("%x+$"); - interface.readcallback = function( event ) -- server handler, called on incoming connections - --vdebug( "server can accept, id/addr/port:", interface, addr, port ) - if interface.fatalerror then - --vdebug( "leaving this event because:", self.fatalerror ) - interface.eventread = nil - return -1 + conn = server; + onconnect = listener.onconnect; -- will be called when new client connected + eventread = false; -- read event handler + eventclose = false; -- close event handler + readcallback = false; -- read event callback + fatalerror = false; -- error message + nointerface = true; -- lock/unlock parameter + + _ip = addr, _port = port, _pattern = pattern, + _sslctx = sslctx; + } + interface.id = tostring(interface):match("%x+$"); + interface.readcallback = function( event ) -- server handler, called on incoming connections + --vdebug( "server can accept, id/addr/port:", interface, addr, port ) + if interface.fatalerror then + --vdebug( "leaving this event because:", self.fatalerror ) + interface.eventread = nil + return -1 + end + local delay = cfg.ACCEPT_DELAY + if EV_TIMEOUT == event then + if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count + debug( "to many connections, seconds to wait for next accept:", delay ) + return EV_TIMEOUT, delay -- timeout... + else + return EV_READ -- accept again end - local delay = cfg.ACCEPT_DELAY - if EV_TIMEOUT == event then - if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count - debug( "to many connections, seconds to wait for next accept:", delay ) - return EV_TIMEOUT, delay -- timeout... - else - return EV_READ -- accept again - end + end + --vdebug("max connection check ok, accepting...") + local client, err = server:accept() -- try to accept; TODO: check err + while client do + if interface._connections >= cfg.MAX_CONNECTIONS then + client:close( ) -- refuse connection + debug( "maximal connections reached, refuse client connection; accept delay:", delay ) + return EV_TIMEOUT, delay -- delay for next accept attempt end - --vdebug("max connection check ok, accepting...") - local client, err = server:accept() -- try to accept; TODO: check err - while client do - if interface._connections >= cfg.MAX_CONNECTIONS then - client:close( ) -- refuse connection - debug( "maximal connections reached, refuse client connection; accept delay:", delay ) - return EV_TIMEOUT, delay -- delay for next accept attempt - end - local client_ip, client_port = client:getpeername( ) - interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) - --vdebug( "client id:", clientinterface, "startssl:", startssl ) + local client_ip, client_port = client:getpeername( ) + interface._connections = interface._connections + 1 -- increase connection count + local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) + --vdebug( "client id:", clientinterface, "startssl:", startssl ) if has_luasec and sslctx then - clientinterface:starttls(sslctx, true) - else - clientinterface:_start_session( true ) - end - debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); - - client, err = server:accept() -- try to accept again + clientinterface:starttls(sslctx, true) + else + clientinterface:_start_session( true ) end - return EV_READ + debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); + + client, err = server:accept() -- try to accept again end + return EV_READ + end - server:settimeout( 0 ) - setmetatable(interface, interface_mt) + server:settimeout( 0 ) + setmetatable(interface, interface_mt) interfacelist[ interface ] = true - interface:_start_session() - return interface - end + interface:_start_session() + return interface +end local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") if sslctx and not has_luasec then debug "fatal error: luasec not found" return nil, "luasec not found" -end - local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket - if not server then - debug( "creating server socket on "..addr.." port "..port.." failed:", err ) - return nil, err - end - local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler - debug( "new server created with id:", tostring(interface)) - return interface end + local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket + if not server then + debug( "creating server socket on "..addr.." port "..port.." failed:", err ) + return nil, err + end + local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler + debug( "new server created with id:", tostring(interface)) + return interface +end local function wrapclient( client, ip, port, listeners, pattern, sslctx ) - local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_connection(sslctx) - return interface, client - --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface - end + local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) + interface:_start_connection(sslctx) + return interface, client + --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface +end local function addclient( addr, serverport, listener, pattern, sslctx, typ ) if sslctx and not has_luasec then debug "need luasec, but not available" return nil, "luasec not found" - end - if not typ then + end + if getaddrinfo and not typ then local addrinfo, err = getaddrinfo(addr) if not addrinfo then return nil, err end if addrinfo[1] and addrinfo[1].family == "inet6" then typ = "tcp6" - else - typ = "tcp" - end end - local create = socket[typ] + end + local create = socket[typ or "tcp"] if type( create ) ~= "function" then return nil, "invalid socket type" - end + end local client, err = create() -- creating new socket if not client then debug( "cannot create socket:", err ) - return nil, err - end + return nil, err + end client:settimeout( 0 ) -- set nonblocking - local res, err = client:connect( addr, serverport ) -- connect - if res or ( err == "timeout" ) then - local ip, port = client:getsockname( ) - local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) - interface:_start_connection( sslctx ) - debug( "new connection id:", interface.id ) - return interface, err - else - debug( "new connection failed:", err ) - return nil, err + local res, err = client:connect( addr, serverport ) -- connect + if res or ( err == "timeout" or err == "Operation already in progress" ) then + if client.getsockname then + addr = client:getsockname( ) end + local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx ) + debug( "new connection id:", interface.id ) + return interface, err + else + debug( "new connection failed:", err ) + return nil, err end +end local function loop( ) -- starts the event loop base:loop( ) @@ -742,7 +741,7 @@ end local function newevent( ... ) return addevent( base, ... ) - end +end local function closeallservers ( arg ) for item in pairs( interfacelist ) do @@ -754,9 +753,9 @@ end local function setquitting(yes) if yes then - -- Quit now - closeallservers(); - base:loopexit(); + -- Quit now + closeallservers(); + base:loopexit(); end end @@ -799,6 +798,20 @@ local function link(sender, receiver, buffersize) sender:set_mode("*a"); end +local function add_task(delay, callback) + local event_handle; + event_handle = base:addevent(nil, 0, function () + local ret = callback(socket_gettime()); + if ret then + return 0, ret; + elseif event_handle then + return -1; + end + end + , delay); + return event_handle; +end + return { cfg = cfg, base = base, @@ -814,6 +827,7 @@ return { closeall = closeallservers, get_backend = get_backend, hook_signal = hook_signal, + add_task = add_task, __NAME = SCRIPT_NAME, __DATE = LAST_MODIFIED, diff --git a/net/server_select.lua b/net/server_select.lua index 52a0d5f1..37d57d29 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -31,17 +31,16 @@ local tostring = use "tostring" --// lua libs //-- -local os = use "os" local table = use "table" local string = use "string" local coroutine = use "coroutine" --// lua lib methods //-- -local os_difftime = os.difftime local math_min = math.min local math_huge = math.huge local table_concat = table.concat +local table_insert = table.insert local string_sub = string.sub local coroutine_wrap = coroutine.wrap local coroutine_yield = coroutine.yield @@ -57,7 +56,6 @@ local getaddrinfo = luasocket.dns.getaddrinfo local ssl_wrap = ( has_luasec and luasec.wrap ) local socket_bind = luasocket.bind -local socket_sleep = luasocket.sleep local socket_select = luasocket.select --// functions //-- @@ -102,7 +100,6 @@ local _sendtraffic local _readtraffic local _selecttimeout -local _sleeptime local _tcpbacklog local _accepretry @@ -116,8 +113,6 @@ local _checkinterval local _sendtimeout local _readtimeout -local _timer - local _maxselectlen local _maxfd @@ -143,7 +138,6 @@ _sendtraffic = 0 -- some stats _readtraffic = 0 _selecttimeout = 1 -- timeout of socket.select -_sleeptime = 0 -- time to wait at the end of every loop _tcpbacklog = 128 -- some kind of hint to the OS _accepretry = 10 -- seconds to wait until the next attempt of a full server to accept @@ -303,7 +297,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local bufferqueuelen = 0 -- end of buffer array local toclose - local fatalerror local needtls local bufferlen = 0 @@ -518,7 +511,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport return dispatch( handler, buffer, err ) else -- connections was closed or fatal error out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) - fatalerror = true _ = handler and handler:force_close( err ) return false end @@ -558,7 +550,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport return true else -- connection was closed during sending or fatal error out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) - fatalerror = true _ = handler and handler:force_close( err ) return false end @@ -807,7 +798,6 @@ end getsettings = function( ) return { select_timeout = _selecttimeout; - select_sleep_time = _sleeptime; tcp_backlog = _tcpbacklog; max_send_buffer_size = _maxsendlen; max_receive_buffer_size = _maxreadlen; @@ -826,7 +816,6 @@ changesettings = function( new ) return nil, "invalid settings table" end _selecttimeout = tonumber( new.select_timeout ) or _selecttimeout - _sleeptime = tonumber( new.select_sleep_time ) or _sleeptime _maxsendlen = tonumber( new.max_send_buffer_size ) or _maxsendlen _maxreadlen = tonumber( new.max_receive_buffer_size ) or _maxreadlen _checkinterval = tonumber( new.select_idle_check_interval ) or _checkinterval @@ -849,6 +838,49 @@ addtimer = function( listener ) return true end +local add_task do + local data = {}; + local new_data = {}; + + function add_task(delay, callback) + local current_time = luasocket_gettime(); + delay = delay + current_time; + if delay >= current_time then + table_insert(new_data, {delay, callback}); + else + local r = callback(current_time); + if r and type(r) == "number" then + return add_task(r, callback); + end + end + end + + addtimer(function(current_time) + if #new_data > 0 then + for _, d in pairs(new_data) do + table_insert(data, d); + end + new_data = {}; + end + + local next_time = math_huge; + for i, d in pairs(data) do + local t, callback = d[1], d[2]; + if t <= current_time then + data[i] = nil; + local r = callback(current_time); + if type(r) == "number" then + add_task(r, callback); + next_time = math_min(next_time, r); + end + else + next_time = math_min(next_time, t - current_time); + end + end + return next_time; + end); +end + stats = function( ) return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen end @@ -862,8 +894,15 @@ end loop = function(once) -- this is the main loop of the program if quitting then return "quitting"; end if once then quitting = "once"; end - local next_timer_time = math_huge; + _currenttime = luasocket_gettime( ) repeat + -- Fire timers + local next_timer_time = math_huge; + for i = 1, _timerlistlen do + local t = _timerlist[ i ]( _currenttime ) -- fire timers + if t then next_timer_time = math_min(next_timer_time, t); end + end + local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) ) for i, socket in ipairs( write ) do -- send data waiting in writequeues local handler = _socketlist[ socket ] @@ -891,17 +930,16 @@ loop = function(once) -- this is the main loop of the program _currenttime = luasocket_gettime( ) -- Check for socket timeouts - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then + if _currenttime - _starttime > _checkinterval then _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then + if _currenttime - timestamp > _sendtimeout then handler.disconnect( )( handler, "send timeout" ) handler:force_close() -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then + if _currenttime - timestamp > _readtimeout then if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then handler.disconnect( )( handler, "read timeout" ) handler:close( ) -- forced disconnect? @@ -912,27 +950,12 @@ loop = function(once) -- this is the main loop of the program end end - -- Fire timers - if _currenttime - _timer >= math_min(next_timer_time, 1) then - next_timer_time = math_huge; - for i = 1, _timerlistlen do - local t = _timerlist[ i ]( _currenttime ) -- fire timers - if t then next_timer_time = math_min(next_timer_time, t); end - end - _timer = _currenttime - else - next_timer_time = next_timer_time - (_currenttime - _timer); - end - for server, paused_time in pairs( _fullservers ) do if _currenttime - paused_time > _accepretry then _fullservers[ server ] = nil; server.resume(); end end - - -- wait some time (0 by default) - socket_sleep( _sleeptime ) until quitting; if once and quitting == "once" then quitting = nil; return; end closeall(); @@ -979,16 +1002,14 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ ) elseif sslctx and not has_luasec then err = "luasec not found" end - if not typ then + if getaddrinfo and not typ then local addrinfo, err = getaddrinfo(address) if not addrinfo then return nil, err end if addrinfo[1] and addrinfo[1].family == "inet6" then typ = "tcp6" - else - typ = "tcp" end end - local create = luasocket[typ] + local create = luasocket[typ or "tcp"] if type( create ) ~= "function" then err = "invalid socket type" end @@ -1004,22 +1025,19 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ ) end client:settimeout( 0 ) local ok, err = client:connect( address, port ) - if ok or err == "timeout" then + if ok or err == "timeout" or err == "Operation already in progress" then return wrapclient( client, address, port, listeners, pattern, sslctx ) else return nil, err end end ---// EXPERIMENTAL //-- - ----------------------------------// BEGIN //-- use "setmetatable" ( _socketlist, { __mode = "k" } ) use "setmetatable" ( _readtimes, { __mode = "k" } ) use "setmetatable" ( _writetimes, { __mode = "k" } ) -_timer = luasocket_gettime( ) _starttime = luasocket_gettime( ) local function setlogger(new_logger) @@ -1034,6 +1052,7 @@ end return { _addtimer = addtimer, + add_task = add_task; addclient = addclient, wrapclient = wrapclient, |