aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/cqueues.lua74
-rw-r--r--net/server.lua86
-rw-r--r--net/server_event.lua1072
-rw-r--r--net/server_select.lua99
4 files changed, 729 insertions, 602 deletions
diff --git a/net/cqueues.lua b/net/cqueues.lua
new file mode 100644
index 00000000..8c4c756f
--- /dev/null
+++ b/net/cqueues.lua
@@ -0,0 +1,74 @@
+-- Prosody IM
+-- Copyright (C) 2014 Daurnimator
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+-- This module allows you to use cqueues with a net.server mainloop
+--
+
+local server = require "net.server";
+local cqueues = require "cqueues";
+assert(cqueues.VERSION >= 20150113, "cqueues newer than 20150113 required")
+
+-- Create a single top level cqueue
+local cq;
+
+if server.cq then -- server provides cqueues object
+ cq = server.cq;
+elseif server.get_backend() == "select" and server._addtimer then -- server_select
+ cq = cqueues.new();
+ local function step()
+ assert(cq:loop(0));
+ end
+
+ -- Use wrapclient (as wrapconnection isn't exported) to get server_select to watch cq fd
+ local handler = server.wrapclient({
+ getfd = function() return cq:pollfd(); end;
+ settimeout = function() end; -- Method just needs to exist
+ close = function() end; -- Need close method for 'closeall'
+ }, nil, nil, {});
+
+ -- Only need to listen for readable; cqueues handles everything under the hood
+ -- readbuffer is called when `select` notes an fd as readable
+ handler.readbuffer = step;
+
+ -- Use server_select low lever timer facility,
+ -- this callback gets called *every* time there is a timeout in the main loop
+ server._addtimer(function(current_time)
+ -- This may end up in extra step()'s, but cqueues handles it for us.
+ step();
+ return cq:timeout();
+ end);
+elseif server.event and server.base then -- server_event
+ cq = cqueues.new();
+ -- Only need to listen for readable; cqueues handles everything under the hood
+ local EV_READ = server.event.EV_READ;
+ -- Convert a cqueues timeout to an acceptable timeout for luaevent
+ local function luaevent_safe_timeout(cq)
+ local t = cq:timeout();
+ -- if you give luaevent 0 or nil, it re-uses the previous timeout.
+ if t == 0 then
+ t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`)
+ elseif t == nil then -- pick something big if we don't have one
+ t = 0x7FFFFFFF; -- largest 32bit int
+ end
+ return t
+ end
+ local event_handle;
+ event_handle = server.base:addevent(cq:pollfd(), EV_READ, function(e)
+ -- Need to reference event_handle or this callback will get collected
+ -- This creates a circular reference that can only be broken if event_handle is manually :close()'d
+ local _ = event_handle;
+ -- Run as many cqueues things as possible (with a timeout of 0)
+ -- If an error is thrown, it will break the libevent loop; but prosody resumes after logging a top level error
+ assert(cq:loop(0));
+ return EV_READ, luaevent_safe_timeout(cq);
+ end, luaevent_safe_timeout(cq));
+else
+ error "NYI"
+end
+
+return {
+ cq = cq;
+}
diff --git a/net/server.lua b/net/server.lua
index 41e180fa..0e13399d 100644
--- a/net/server.lua
+++ b/net/server.lua
@@ -6,49 +6,28 @@
-- COPYING file in the source package for more information.
--
-local use_luaevent = prosody and require "core.configmanager".get("*", "use_libevent");
+local server_type = prosody and require "core.configmanager".get("*", "network_backend") or "select";
+if prosody and require "core.configmanager".get("*", "use_libevent") then
+ server_type = "event";
+end
-if use_luaevent then
- use_luaevent = pcall(require, "luaevent.core");
- if not use_luaevent then
+if server_type == "event" then
+ if not pcall(require, "luaevent.core") then
log("error", "libevent not found, falling back to select()");
+ server_type = "select"
end
end
local server;
-
-if use_luaevent then
+local set_config;
+if server_type == "event" then
server = require "net.server_event";
- -- Overwrite signal.signal() because we need to ask libevent to
- -- handle them instead
- local ok, signal = pcall(require, "util.signal");
- if ok and signal then
- local _signal_signal = signal.signal;
- function signal.signal(signal_id, handler)
- if type(signal_id) == "string" then
- signal_id = signal[signal_id:upper()];
- end
- if type(signal_id) ~= "number" then
- return false, "invalid-signal";
- end
- return server.hook_signal(signal_id, handler);
- end
- end
-else
- use_luaevent = false;
- server = require "net.server_select";
-end
-
-if prosody then
- local config_get = require "core.configmanager".get;
local defaults = {};
- for k,v in pairs(server.cfg or server.getsettings()) do
+ for k,v in pairs(server.cfg) do
defaults[k] = v;
end
- local function load_config()
- local settings = config_get("*", "network_settings") or {};
- if use_luaevent then
+ function set_config(settings)
local event_settings = {
ACCEPT_DELAY = settings.accept_retry_interval;
ACCEPT_QUEUE = settings.tcp_backlog;
@@ -67,13 +46,54 @@ if prosody then
for k,default in pairs(defaults) do
server.cfg[k] = event_settings[k] or default;
end
- else
+ end
+elseif server_type == "select" then
+ server = require "net.server_select";
+
+ local defaults = {};
+ for k,v in pairs(server.getsettings()) do
+ defaults[k] = v;
+ end
+ function set_config(settings)
local select_settings = {};
for k,default in pairs(defaults) do
select_settings[k] = settings[k] or default;
end
server.changesettings(select_settings);
end
+else
+ error("Unsupported server type")
+end
+
+-- If server.hook_signal exists, replace signal.signal()
+local has_signal, signal = pcall(require, "util.signal");
+if has_signal then
+ if server.hook_signal then
+ function signal.signal(signal_id, handler)
+ if type(signal_id) == "string" then
+ signal_id = signal[signal_id:upper()];
+ end
+ if type(signal_id) ~= "number" then
+ return false, "invalid-signal";
+ end
+ return server.hook_signal(signal_id, handler);
+ end
+ else
+ server.hook_signal = signal.signal;
+ end
+else
+ if not server.hook_signal then
+ server.hook_signal = function()
+ return false, "signal hooking not supported"
+ end
+ end
+end
+
+if prosody then
+ local config_get = require "core.configmanager".get;
+ local function load_config()
+ local settings = config_get("*", "network_settings") or {};
+ return set_config(settings);
end
load_config();
prosody.events.add_handler("config-reloaded", load_config);
diff --git a/net/server_event.lua b/net/server_event.lua
index 70a6dc37..7940a1b8 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -92,648 +92,647 @@ local interfacelist = { }
-- Client interface methods
local interface_mt = {}; interface_mt.__index = interface_mt;
- -- Private methods
- function interface_mt:_close()
- return self:_destroy();
- end
-
- function interface_mt:_start_connection(plainssl) -- should be called from addclient
- local callback = function( event )
- if EV_TIMEOUT == event then -- timeout during connection
- self.fatalerror = "connection timeout"
- self:ontimeout() -- call timeout listener
- self:_close()
- debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
- else
+-- Private methods
+function interface_mt:_close()
+ return self:_destroy();
+end
+
+function interface_mt:_start_connection(plainssl) -- called from wrapclient
+ local callback = function( event )
+ if EV_TIMEOUT == event then -- timeout during connection
+ self.fatalerror = "connection timeout"
+ self:ontimeout() -- call timeout listener
+ self:_close()
+ debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
+ else
if plainssl and has_luasec then -- start ssl session
- self:starttls(self._sslctx, true)
- else -- normal connection
- self:_start_session(true)
- end
- debug( "new connection established. id:", self.id )
- end
- self.eventconnect = nil
- return -1
+ self:starttls(self._sslctx, true)
+ else -- normal connection
+ self:_start_session(true)
end
- self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
- return true
+ debug( "new connection established. id:", self.id )
+ end
+ self.eventconnect = nil
+ return -1
end
- function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
- if self.type == "client" then
- local callback = function( )
- self:_lock( false, false, false )
- --vdebug( "start listening on client socket with id:", self.id )
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
- if call_onconnect then
- self:onconnect()
- end
- self.eventsession = nil
- return -1
+ self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
+ return true
+end
+function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
+ if self.type == "client" then
+ local callback = function( )
+ self:_lock( false, false, false )
+ --vdebug( "start listening on client socket with id:", self.id )
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ if call_onconnect then
+ self:onconnect()
end
- self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 )
- else
- self:_lock( false )
- --vdebug( "start listening on server socket with id:", self.id )
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback
+ self.eventsession = nil
+ return -1
end
- return true
+ self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 )
+ else
+ self:_lock( false )
+ --vdebug( "start listening on server socket with id:", self.id )
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback
+ end
+ return true
+end
+function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
+ --vdebug( "starting ssl session with client id:", self.id )
+ local _
+ _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
+ _ = self.eventwrite and self.eventwrite:close( )
+ self.eventread, self.eventwrite = nil, nil
+ local err
+ self.conn, err = ssl.wrap( self.conn, self._sslctx )
+ if err then
+ self.fatalerror = err
+ self.conn = nil -- cannot be used anymore
+ if call_onconnect then
+ self.ondisconnect = nil -- dont call this when client isnt really connected
+ end
+ self:_close()
+ debug( "fatal error while ssl wrapping:", err )
+ return false
end
- function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
- --vdebug( "starting ssl session with client id:", self.id )
- local _
- _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
- _ = self.eventwrite and self.eventwrite:close( )
- self.eventread, self.eventwrite = nil, nil
- local err
- self.conn, err = ssl.wrap( self.conn, self._sslctx )
- if err then
- self.fatalerror = err
- self.conn = nil -- cannot be used anymore
+ self.conn:settimeout( 0 ) -- set non blocking
+ local handshakecallback = coroutine_wrap(function( event )
+ local _, err
+ local attempt = 0
+ local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
+ while attempt < maxattempt do -- no endless loop
+ attempt = attempt + 1
+ debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
+ if attempt > maxattempt then
+ self.fatalerror = "max handshake attempts exceeded"
+ elseif EV_TIMEOUT == event then
+ self.fatalerror = "timeout during handshake"
+ else
+ _, err = self.conn:dohandshake( )
+ if not err then
+ self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
+ self.send = self.conn.send -- caching table lookups with new client object
+ self.receive = self.conn.receive
+ if not call_onconnect then -- trigger listener
+ self:onstatus("ssl-handshake-complete");
+ end
+ self:_start_session( call_onconnect )
+ debug( "ssl handshake done" )
+ self.eventhandshake = nil
+ return -1
+ end
+ if err == "wantwrite" then
+ event = EV_WRITE
+ elseif err == "wantread" then
+ event = EV_READ
+ else
+ debug( "ssl handshake error:", err )
+ self.fatalerror = err
+ end
+ end
+ if self.fatalerror then
if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
- debug( "fatal error while ssl wrapping:", err )
- return false
+ debug( "handshake failed because:", self.fatalerror )
+ self.eventhandshake = nil
+ return -1
end
- self.conn:settimeout( 0 ) -- set non blocking
- local handshakecallback = coroutine_wrap(function( event )
- local _, err
- local attempt = 0
- local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
- while attempt < maxattempt do -- no endless loop
- attempt = attempt + 1
- debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
- if attempt > maxattempt then
- self.fatalerror = "max handshake attempts exceeded"
- elseif EV_TIMEOUT == event then
- self.fatalerror = "timeout during handshake"
- else
- _, err = self.conn:dohandshake( )
- if not err then
- self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
- self.send = self.conn.send -- caching table lookups with new client object
- self.receive = self.conn.receive
- if not call_onconnect then -- trigger listener
- self:onstatus("ssl-handshake-complete");
- end
- self:_start_session( call_onconnect )
- debug( "ssl handshake done" )
- self.eventhandshake = nil
- return -1
- end
- if err == "wantwrite" then
- event = EV_WRITE
- elseif err == "wantread" then
- event = EV_READ
- else
- debug( "ssl handshake error:", err )
- self.fatalerror = err
- end
- end
- if self.fatalerror then
- if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
- end
- self:_close()
- debug( "handshake failed because:", self.fatalerror )
- self.eventhandshake = nil
- return -1
- end
- event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
- end
- end
- )
- debug "starting handshake..."
- self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
- self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
- return true
+ event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
+ end
end
- function interface_mt:_destroy() -- close this interface + events and call last listener
- debug( "closing client with id:", self.id, self.fatalerror )
- self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
- local _
- _ = self.eventread and self.eventread:close( )
- if self.type == "client" then
- _ = self.eventwrite and self.eventwrite:close( )
- _ = self.eventhandshake and self.eventhandshake:close( )
- _ = self.eventstarthandshake and self.eventstarthandshake:close( )
- _ = self.eventconnect and self.eventconnect:close( )
- _ = self.eventsession and self.eventsession:close( )
- _ = self.eventwritetimeout and self.eventwritetimeout:close( )
- _ = self.eventreadtimeout and self.eventreadtimeout:close( )
- _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
- _ = self.conn and self.conn:close( ) -- close connection
- _ = self._server and self._server:counter(-1);
- self.eventread, self.eventwrite = nil, nil
- self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
- self.readcallback, self.writecallback = nil, nil
- else
- self.conn:close( )
- self.eventread, self.eventclose = nil, nil
- self.interface, self.readcallback = nil, nil
- end
- interfacelist[ self ] = nil
- return true
+ )
+ debug "starting handshake..."
+ self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
+ self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
+ return true
+end
+function interface_mt:_destroy() -- close this interface + events and call last listener
+ debug( "closing client with id:", self.id, self.fatalerror )
+ self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
+ local _
+ _ = self.eventread and self.eventread:close( )
+ if self.type == "client" then
+ _ = self.eventwrite and self.eventwrite:close( )
+ _ = self.eventhandshake and self.eventhandshake:close( )
+ _ = self.eventstarthandshake and self.eventstarthandshake:close( )
+ _ = self.eventconnect and self.eventconnect:close( )
+ _ = self.eventsession and self.eventsession:close( )
+ _ = self.eventwritetimeout and self.eventwritetimeout:close( )
+ _ = self.eventreadtimeout and self.eventreadtimeout:close( )
+ _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
+ _ = self.conn and self.conn:close( ) -- close connection
+ _ = self._server and self._server:counter(-1);
+ self.eventread, self.eventwrite = nil, nil
+ self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
+ self.readcallback, self.writecallback = nil, nil
+ else
+ self.conn:close( )
+ self.eventread, self.eventclose = nil, nil
+ self.interface, self.readcallback = nil, nil
end
+ interfacelist[ self ] = nil
+ return true
+end
- function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
- self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
- return nointerface, noreading, nowriting
- end
+function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
+ self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
+ return nointerface, noreading, nowriting
+end
- --TODO: Deprecate
- function interface_mt:lock_read(switch)
- if switch then
- return self:pause();
- else
- return self:resume();
- end
+--TODO: Deprecate
+function interface_mt:lock_read(switch)
+ if switch then
+ return self:pause();
+ else
+ return self:resume();
end
+end
- function interface_mt:pause()
- return self:_lock(self.nointerface, true, self.nowriting);
- end
+function interface_mt:pause()
+ return self:_lock(self.nointerface, true, self.nowriting);
+end
- function interface_mt:resume()
- self:_lock(self.nointerface, false, self.nowriting);
+function interface_mt:resume()
+ self:_lock(self.nointerface, false, self.nowriting);
if self.readcallback and not self.eventread then
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
return true;
- end
end
+end
- function interface_mt:counter(c)
- if c then
- self._connections = self._connections + c
- end
- return self._connections
- end
-
- -- Public methods
- function interface_mt:write(data)
- if self.nowriting then return nil, "locked" end
- --vdebug( "try to send data to client, id/data:", self.id, data )
- data = tostring( data )
- local len = #data
- local total = len + self.writebufferlen
- if total > cfg.MAX_SEND_LENGTH then -- check buffer length
- local err = "send buffer exceeded"
- debug( "error:", err ) -- to much, check your app
- return nil, err
- end
- t_insert(self.writebuffer, data) -- new buffer
- self.writebufferlen = total
- if not self.eventwrite then -- register new write event
- --vdebug( "register new write event" )
- self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT )
- end
- return true
+function interface_mt:counter(c)
+ if c then
+ self._connections = self._connections + c
end
- function interface_mt:close()
- if self.nointerface then return nil, "locked"; end
- debug( "try to close client connection with id:", self.id )
- if self.type == "client" then
- self.fatalerror = "client to close"
- if self.eventwrite then -- wait for incomplete write request
- self:_lock( true, true, false )
- debug "closing delayed until writebuffer is empty"
- return nil, "writebuffer not empty, waiting"
- else -- close now
- self:_lock( true, true, true )
- self:_close()
- return true
- end
- else
- debug( "try to close server with id:", tostring(self.id))
- self.fatalerror = "server to close"
- self:_lock( true )
- self:_close( 0 )
+ return self._connections
+end
+
+-- Public methods
+function interface_mt:write(data)
+ if self.nowriting then return nil, "locked" end
+ --vdebug( "try to send data to client, id/data:", self.id, data )
+ data = tostring( data )
+ local len = #data
+ local total = len + self.writebufferlen
+ if total > cfg.MAX_SEND_LENGTH then -- check buffer length
+ local err = "send buffer exceeded"
+ debug( "error:", err ) -- to much, check your app
+ return nil, err
+ end
+ t_insert(self.writebuffer, data) -- new buffer
+ self.writebufferlen = total
+ if not self.eventwrite then -- register new write event
+ --vdebug( "register new write event" )
+ self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT )
+ end
+ return true
+end
+function interface_mt:close()
+ if self.nointerface then return nil, "locked"; end
+ debug( "try to close client connection with id:", self.id )
+ if self.type == "client" then
+ self.fatalerror = "client to close"
+ if self.eventwrite then -- wait for incomplete write request
+ self:_lock( true, true, false )
+ debug "closing delayed until writebuffer is empty"
+ return nil, "writebuffer not empty, waiting"
+ else -- close now
+ self:_lock( true, true, true )
+ self:_close()
return true
end
+ else
+ debug( "try to close server with id:", tostring(self.id))
+ self.fatalerror = "server to close"
+ self:_lock( true )
+ self:_close( 0 )
+ return true
end
+end
- function interface_mt:socket()
- return self.conn
- end
+function interface_mt:socket()
+ return self.conn
+end
- function interface_mt:server()
- return self._server or self;
- end
+function interface_mt:server()
+ return self._server or self;
+end
- function interface_mt:port()
- return self._port
- end
+function interface_mt:port()
+ return self._port
+end
- function interface_mt:serverport()
- return self._serverport
- end
+function interface_mt:serverport()
+ return self._serverport
+end
- function interface_mt:ip()
- return self._ip
- end
+function interface_mt:ip()
+ return self._ip
+end
- function interface_mt:ssl()
- return self._usingssl
- end
- interface_mt.clientport = interface_mt.port -- COMPAT server_select
+function interface_mt:ssl()
+ return self._usingssl
+end
+interface_mt.clientport = interface_mt.port -- COMPAT server_select
- function interface_mt:type()
- return self._type or "client"
- end
+function interface_mt:type()
+ return self._type or "client"
+end
- function interface_mt:connections()
- return self._connections
- end
+function interface_mt:connections()
+ return self._connections
+end
- function interface_mt:address()
- return self.addr
- end
+function interface_mt:address()
+ return self.addr
+end
- function interface_mt:set_sslctx(sslctx)
- self._sslctx = sslctx;
- if sslctx then
- self.starttls = nil; -- use starttls() of interface_mt
- else
- self.starttls = false; -- prevent starttls()
- end
+function interface_mt:set_sslctx(sslctx)
+ self._sslctx = sslctx;
+ if sslctx then
+ self.starttls = nil; -- use starttls() of interface_mt
+ else
+ self.starttls = false; -- prevent starttls()
end
+end
- function interface_mt:set_mode(pattern)
- if pattern then
- self._pattern = pattern;
- end
- return self._pattern;
+function interface_mt:set_mode(pattern)
+ if pattern then
+ self._pattern = pattern;
end
+ return self._pattern;
+end
function interface_mt:set_send(new_send) -- luacheck: ignore 212
- -- No-op, we always use the underlying connection's send
- end
+ -- No-op, we always use the underlying connection's send
+end
- function interface_mt:starttls(sslctx, call_onconnect)
- debug( "try to start ssl at client id:", self.id )
- local err
- self._sslctx = sslctx;
- if self._usingssl then -- startssl was already called
- err = "ssl already active"
- end
- if err then
- debug( "error:", err )
- return nil, err
- end
- self._usingssl = true
- self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
- self.startsslcallback = nil
- self:_start_ssl(call_onconnect);
- self.eventstarthandshake = nil
- return -1
- end
- if not self.eventwrite then
- self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake
- self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake
- else -- wait until writebuffer is empty
- self:_lock( true, true, false )
- debug "ssl session delayed until writebuffer is empty..."
- end
- self.starttls = false;
- return true
- end
+function interface_mt:starttls(sslctx, call_onconnect)
+ debug( "try to start ssl at client id:", self.id )
+ local err
+ self._sslctx = sslctx;
+ if self._usingssl then -- startssl was already called
+ err = "ssl already active"
+ end
+ if err then
+ debug( "error:", err )
+ return nil, err
+ end
+ self._usingssl = true
+ self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
+ self.startsslcallback = nil
+ self:_start_ssl(call_onconnect);
+ self.eventstarthandshake = nil
+ return -1
+ end
+ if not self.eventwrite then
+ self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake
+ self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake
+ else -- wait until writebuffer is empty
+ self:_lock( true, true, false )
+ debug "ssl session delayed until writebuffer is empty..."
+ end
+ self.starttls = false;
+ return true
+end
- function interface_mt:setoption(option, value)
- if self.conn.setoption then
- return self.conn:setoption(option, value);
- end
- return false, "setoption not implemented";
+function interface_mt:setoption(option, value)
+ if self.conn.setoption then
+ return self.conn:setoption(option, value);
end
+ return false, "setoption not implemented";
+end
- function interface_mt:setlistener(listener)
- self:ondetach(); -- Notify listener that it is no longer responsible for this connection
+function interface_mt:setlistener(listener)
+ self:ondetach(); -- Notify listener that it is no longer responsible for this connection
self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout,
self.onreadtimeout, self.onstatus, self.ondetach
= listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout,
listener.onreadtimeout, listener.onstatus, listener.ondetach;
- end
+end
- -- Stub handlers
- function interface_mt:onconnect()
- end
- function interface_mt:onincoming()
- end
- function interface_mt:ondisconnect()
- end
- function interface_mt:ontimeout()
- end
+-- Stub handlers
+function interface_mt:onconnect()
+end
+function interface_mt:onincoming()
+end
+function interface_mt:ondisconnect()
+end
+function interface_mt:ontimeout()
+end
function interface_mt:onreadtimeout()
self.fatalerror = "timeout during receiving"
debug( "connection failed:", self.fatalerror )
self:_close()
self.eventread = nil
end
- function interface_mt:ondrain()
- end
- function interface_mt:ondetach()
- end
- function interface_mt:onstatus()
- end
+function interface_mt:ondrain()
+end
+function interface_mt:ondetach()
+end
+function interface_mt:onstatus()
+end
-- End of client interface methods
local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface
- --vdebug("creating client interfacce...")
- local interface = {
- type = "client";
- conn = client;
- currenttime = socket_gettime( ); -- safe the origin
- writebuffer = {}; -- writebuffer
- writebufferlen = 0; -- length of writebuffer
- send = client.send; -- caching table lookups
- receive = client.receive;
- onconnect = listener.onconnect; -- will be called when client disconnects
- ondisconnect = listener.ondisconnect; -- will be called when client disconnects
- onincoming = listener.onincoming; -- will be called when client sends data
- ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ --vdebug("creating client interfacce...")
+ local interface = {
+ type = "client";
+ conn = client;
+ currenttime = socket_gettime( ); -- safe the origin
+ writebuffer = {}; -- writebuffer
+ writebufferlen = 0; -- length of writebuffer
+ send = client.send; -- caching table lookups
+ receive = client.receive;
+ onconnect = listener.onconnect; -- will be called when client disconnects
+ ondisconnect = listener.ondisconnect; -- will be called when client disconnects
+ onincoming = listener.onincoming; -- will be called when client sends data
+ ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
- ondrain = listener.ondrain; -- called when writebuffer is empty
- ondetach = listener.ondetach; -- called when disassociating this listener from this connection
- onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
- eventread = false, eventwrite = false, eventclose = false,
- eventhandshake = false, eventstarthandshake = false; -- event handler
- eventconnect = false, eventsession = false; -- more event handler...
- eventwritetimeout = false; -- even more event handler...
- eventreadtimeout = false;
- fatalerror = false; -- error message
- writecallback = false; -- will be called on write events
- readcallback = false; -- will be called on read events
- nointerface = true; -- lock/unlock parameter of this interface
- noreading = false, nowriting = false; -- locks of the read/writecallback
- startsslcallback = false; -- starting handshake callback
- position = false; -- position of client in interfacelist
-
- -- Properties
- _ip = ip, _port = port, _server = server, _pattern = pattern,
- _serverport = (server and server:port() or nil),
- _sslctx = sslctx; -- parameters
- _usingssl = false; -- client is using ssl;
- }
+ ondrain = listener.ondrain; -- called when writebuffer is empty
+ ondetach = listener.ondetach; -- called when disassociating this listener from this connection
+ onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
+ eventread = false, eventwrite = false, eventclose = false,
+ eventhandshake = false, eventstarthandshake = false; -- event handler
+ eventconnect = false, eventsession = false; -- more event handler...
+ eventwritetimeout = false; -- even more event handler...
+ eventreadtimeout = false;
+ fatalerror = false; -- error message
+ writecallback = false; -- will be called on write events
+ readcallback = false; -- will be called on read events
+ nointerface = true; -- lock/unlock parameter of this interface
+ noreading = false, nowriting = false; -- locks of the read/writecallback
+ startsslcallback = false; -- starting handshake callback
+ position = false; -- position of client in interfacelist
+
+ -- Properties
+ _ip = ip, _port = port, _server = server, _pattern = pattern,
+ _serverport = (server and server:port() or nil),
+ _sslctx = sslctx; -- parameters
+ _usingssl = false; -- client is using ssl;
+ }
if not has_luasec then interface.starttls = false; end
- interface.id = tostring(interface):match("%x+$");
- interface.writecallback = function( event ) -- called on write events
- --vdebug( "new client write event, id/ip/port:", interface, ip, port )
- if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event
- --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror )
- interface.eventwrite = false
- return -1
+ interface.id = tostring(interface):match("%x+$");
+ interface.writecallback = function( event ) -- called on write events
+ --vdebug( "new client write event, id/ip/port:", interface, ip, port )
+ if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event
+ --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror )
+ interface.eventwrite = false
+ return -1
+ end
+ if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect
+ interface.fatalerror = "timeout during writing"
+ debug( "writing failed:", interface.fatalerror )
+ interface:_close()
+ interface.eventwrite = false
+ return -1
+ else -- can write :)
+ if interface._usingssl then -- handle luasec
+ if interface.eventreadtimeout then -- we have to read first
+ local ret = interface.readcallback( ) -- call readcallback
+ --vdebug( "tried to read in writecallback, result:", ret )
+ end
+ if interface.eventwritetimeout then -- luasec only
+ interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error
+ interface.eventwritetimeout = false
+ end
end
- if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect
- interface.fatalerror = "timeout during writing"
- debug( "writing failed:", interface.fatalerror )
- interface:_close()
- interface.eventwrite = false
- return -1
- else -- can write :)
- if interface._usingssl then -- handle luasec
- if interface.eventreadtimeout then -- we have to read first
- local ret = interface.readcallback( ) -- call readcallback
- --vdebug( "tried to read in writecallback, result:", ret )
- end
- if interface.eventwritetimeout then -- luasec only
- interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error
- interface.eventwritetimeout = false
- end
+ interface.writebuffer = { t_concat(interface.writebuffer) }
+ local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
+ --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
+ if succ then -- writing succesful
+ interface.writebuffer[1] = nil
+ interface.writebufferlen = 0
+ interface:ondrain();
+ if interface.fatalerror then
+ debug "closing client after writing"
+ interface:_close() -- close interface if needed
+ elseif interface.startsslcallback then -- start ssl connection if needed
+ debug "starting ssl handshake after writing"
+ interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
+ elseif interface.eventreadtimeout then
+ return EV_WRITE, EV_TIMEOUT
end
- interface.writebuffer = { t_concat(interface.writebuffer) }
- local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
- --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
- if succ then -- writing succesful
- interface.writebuffer[1] = nil
- interface.writebufferlen = 0
- interface:ondrain();
- if interface.fatalerror then
- debug "closing client after writing"
- interface:_close() -- close interface if needed
- elseif interface.startsslcallback then -- start ssl connection if needed
- debug "starting ssl handshake after writing"
- interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
- elseif interface.eventreadtimeout then
- return EV_WRITE, EV_TIMEOUT
- end
- interface.eventwrite = nil
- return -1
- elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
- --vdebug( "writebuffer is not empty:", err )
+ interface.eventwrite = nil
+ return -1
+ elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
+ --vdebug( "writebuffer is not empty:", err )
interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer
- interface.writebufferlen = interface.writebufferlen - byte
- if "wantread" == err then -- happens only with luasec
- local callback = function( )
- interface:_close()
- interface.eventwritetimeout = nil
- return -1;
- end
- interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
- debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
- -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
- return -1
+ interface.writebufferlen = interface.writebufferlen - byte
+ if "wantread" == err then -- happens only with luasec
+ local callback = function( )
+ interface:_close()
+ interface.eventwritetimeout = nil
+ return -1;
end
- return EV_WRITE, cfg.WRITE_TIMEOUT
- else -- connection was closed during writing or fatal error
- interface.fatalerror = err or "fatal error"
- debug( "connection failed in write event:", interface.fatalerror )
- interface:_close()
- interface.eventwrite = nil
+ interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
+ debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
+ -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
return -1
end
+ return EV_WRITE, cfg.WRITE_TIMEOUT
+ else -- connection was closed during writing or fatal error
+ interface.fatalerror = err or "fatal error"
+ debug( "connection failed in write event:", interface.fatalerror )
+ interface:_close()
+ interface.eventwrite = nil
+ return -1
end
end
+ end
- interface.readcallback = function( event ) -- called on read events
- --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
- if interface.noreading or interface.fatalerror then -- leave this event
- --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) )
- interface.eventread = nil
- return -1
- end
+ interface.readcallback = function( event ) -- called on read events
+ --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
+ if interface.noreading or interface.fatalerror then -- leave this event
+ --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) )
+ interface.eventread = nil
+ return -1
+ end
if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
return -1 -- took too long to get some data from client -> disconnect
end
- if interface._usingssl then -- handle luasec
- if interface.eventwritetimeout then -- ok, in the past writecallback was regged
- local ret = interface.writecallback( ) -- call it
- --vdebug( "tried to write in readcallback, result:", tostring(ret) )
- end
- if interface.eventreadtimeout then
- interface.eventreadtimeout:close( )
- interface.eventreadtimeout = nil
- end
- end
- local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
- --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
- buffer = buffer or part
- if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
- interface.fatalerror = "receive buffer exceeded"
- debug( "fatal error:", interface.fatalerror )
- interface:_close()
- interface.eventread = nil
- return -1
+ if interface._usingssl then -- handle luasec
+ if interface.eventwritetimeout then -- ok, in the past writecallback was regged
+ local ret = interface.writecallback( ) -- call it
+ --vdebug( "tried to write in readcallback, result:", tostring(ret) )
+ end
+ if interface.eventreadtimeout then
+ interface.eventreadtimeout:close( )
+ interface.eventreadtimeout = nil
+ end
+ end
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
+ --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
+ buffer = buffer or part
+ if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
+ interface.fatalerror = "receive buffer exceeded"
+ debug( "fatal error:", interface.fatalerror )
+ interface:_close()
+ interface.eventread = nil
+ return -1
+ end
+ if err and ( err ~= "timeout" and err ~= "wantread" ) then
+ if "wantwrite" == err then -- need to read on write event
+ if not interface.eventwrite then -- register new write event if needed
+ interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
end
- if err and ( err ~= "timeout" and err ~= "wantread" ) then
- if "wantwrite" == err then -- need to read on write event
- if not interface.eventwrite then -- register new write event if needed
- interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
- end
- interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
- function( )
- interface:_close()
- end, cfg.READ_TIMEOUT
- )
- debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
- -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
- else -- connection was closed or fatal error
- interface.fatalerror = err
- debug( "connection failed in read event:", interface.fatalerror )
+ interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
+ function( )
interface:_close()
- interface.eventread = nil
- return -1
- end
- else
- interface.onincoming( interface, buffer, err ) -- send new data to listener
- end
- if interface.noreading then
- interface.eventread = nil;
- return -1;
- end
- return EV_READ, cfg.READ_TIMEOUT
+ end, cfg.READ_TIMEOUT
+ )
+ debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
+ -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
+ else -- connection was closed or fatal error
+ interface.fatalerror = err
+ debug( "connection failed in read event:", interface.fatalerror )
+ interface:_close()
+ interface.eventread = nil
+ return -1
end
+ else
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
+ end
+ if interface.noreading then
+ interface.eventread = nil;
+ return -1;
+ end
+ return EV_READ, cfg.READ_TIMEOUT
+ end
- client:settimeout( 0 ) -- set non blocking
- setmetatable(interface, interface_mt)
+ client:settimeout( 0 ) -- set non blocking
+ setmetatable(interface, interface_mt)
interfacelist[ interface ] = true -- add to interfacelist
- return interface
- end
+ return interface
+end
local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface
- debug "creating server interface..."
- local interface = {
- _connections = 0;
+ debug "creating server interface..."
+ local interface = {
+ _connections = 0;
type = "server";
- conn = server;
- onconnect = listener.onconnect; -- will be called when new client connected
- eventread = false; -- read event handler
- eventclose = false; -- close event handler
- readcallback = false; -- read event callback
- fatalerror = false; -- error message
- nointerface = true; -- lock/unlock parameter
-
- _ip = addr, _port = port, _pattern = pattern,
- _sslctx = sslctx;
- }
- interface.id = tostring(interface):match("%x+$");
- interface.readcallback = function( event ) -- server handler, called on incoming connections
- --vdebug( "server can accept, id/addr/port:", interface, addr, port )
- if interface.fatalerror then
- --vdebug( "leaving this event because:", self.fatalerror )
- interface.eventread = nil
- return -1
+ conn = server;
+ onconnect = listener.onconnect; -- will be called when new client connected
+ eventread = false; -- read event handler
+ eventclose = false; -- close event handler
+ readcallback = false; -- read event callback
+ fatalerror = false; -- error message
+ nointerface = true; -- lock/unlock parameter
+
+ _ip = addr, _port = port, _pattern = pattern,
+ _sslctx = sslctx;
+ }
+ interface.id = tostring(interface):match("%x+$");
+ interface.readcallback = function( event ) -- server handler, called on incoming connections
+ --vdebug( "server can accept, id/addr/port:", interface, addr, port )
+ if interface.fatalerror then
+ --vdebug( "leaving this event because:", self.fatalerror )
+ interface.eventread = nil
+ return -1
+ end
+ local delay = cfg.ACCEPT_DELAY
+ if EV_TIMEOUT == event then
+ if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count
+ debug( "to many connections, seconds to wait for next accept:", delay )
+ return EV_TIMEOUT, delay -- timeout...
+ else
+ return EV_READ -- accept again
end
- local delay = cfg.ACCEPT_DELAY
- if EV_TIMEOUT == event then
- if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count
- debug( "to many connections, seconds to wait for next accept:", delay )
- return EV_TIMEOUT, delay -- timeout...
- else
- return EV_READ -- accept again
- end
+ end
+ --vdebug("max connection check ok, accepting...")
+ local client, err = server:accept() -- try to accept; TODO: check err
+ while client do
+ if interface._connections >= cfg.MAX_CONNECTIONS then
+ client:close( ) -- refuse connection
+ debug( "maximal connections reached, refuse client connection; accept delay:", delay )
+ return EV_TIMEOUT, delay -- delay for next accept attempt
end
- --vdebug("max connection check ok, accepting...")
- local client, err = server:accept() -- try to accept; TODO: check err
- while client do
- if interface._connections >= cfg.MAX_CONNECTIONS then
- client:close( ) -- refuse connection
- debug( "maximal connections reached, refuse client connection; accept delay:", delay )
- return EV_TIMEOUT, delay -- delay for next accept attempt
- end
- local client_ip, client_port = client:getpeername( )
- interface._connections = interface._connections + 1 -- increase connection count
- local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
- --vdebug( "client id:", clientinterface, "startssl:", startssl )
+ local client_ip, client_port = client:getpeername( )
+ interface._connections = interface._connections + 1 -- increase connection count
+ local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
+ --vdebug( "client id:", clientinterface, "startssl:", startssl )
if has_luasec and sslctx then
- clientinterface:starttls(sslctx, true)
- else
- clientinterface:_start_session( true )
- end
- debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
-
- client, err = server:accept() -- try to accept again
+ clientinterface:starttls(sslctx, true)
+ else
+ clientinterface:_start_session( true )
end
- return EV_READ
+ debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
+
+ client, err = server:accept() -- try to accept again
end
+ return EV_READ
+ end
- server:settimeout( 0 )
- setmetatable(interface, interface_mt)
+ server:settimeout( 0 )
+ setmetatable(interface, interface_mt)
interfacelist[ interface ] = true
- interface:_start_session()
- return interface
- end
+ interface:_start_session()
+ return interface
+end
local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments
--vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil")
if sslctx and not has_luasec then
debug "fatal error: luasec not found"
return nil, "luasec not found"
-end
- local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
- if not server then
- debug( "creating server socket on "..addr.." port "..port.." failed:", err )
- return nil, err
- end
- local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
- debug( "new server created with id:", tostring(interface))
- return interface
end
+ local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
+ if not server then
+ debug( "creating server socket on "..addr.." port "..port.." failed:", err )
+ return nil, err
+ end
+ local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
+ debug( "new server created with id:", tostring(interface))
+ return interface
+end
local function wrapclient( client, ip, port, listeners, pattern, sslctx )
- local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
- interface:_start_connection(sslctx)
- return interface, client
- --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
- end
+ local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
+ interface:_start_connection(sslctx)
+ return interface, client
+ --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
+end
local function addclient( addr, serverport, listener, pattern, sslctx, typ )
if sslctx and not has_luasec then
debug "need luasec, but not available"
return nil, "luasec not found"
- end
- if not typ then
+ end
+ if getaddrinfo and not typ then
local addrinfo, err = getaddrinfo(addr)
if not addrinfo then return nil, err end
if addrinfo[1] and addrinfo[1].family == "inet6" then
typ = "tcp6"
- else
- typ = "tcp"
- end
end
- local create = socket[typ]
+ end
+ local create = socket[typ or "tcp"]
if type( create ) ~= "function" then
return nil, "invalid socket type"
- end
+ end
local client, err = create() -- creating new socket
if not client then
debug( "cannot create socket:", err )
- return nil, err
- end
+ return nil, err
+ end
client:settimeout( 0 ) -- set nonblocking
- local res, err = client:connect( addr, serverport ) -- connect
- if res or ( err == "timeout" ) then
- local ip, port = client:getsockname( )
- local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx )
- interface:_start_connection( sslctx )
- debug( "new connection id:", interface.id )
- return interface, err
- else
- debug( "new connection failed:", err )
- return nil, err
+ local res, err = client:connect( addr, serverport ) -- connect
+ if res or ( err == "timeout" or err == "Operation already in progress" ) then
+ if client.getsockname then
+ addr = client:getsockname( )
end
+ local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx )
+ debug( "new connection id:", interface.id )
+ return interface, err
+ else
+ debug( "new connection failed:", err )
+ return nil, err
end
+end
local function loop( ) -- starts the event loop
base:loop( )
@@ -742,7 +741,7 @@ end
local function newevent( ... )
return addevent( base, ... )
- end
+end
local function closeallservers ( arg )
for item in pairs( interfacelist ) do
@@ -754,9 +753,9 @@ end
local function setquitting(yes)
if yes then
- -- Quit now
- closeallservers();
- base:loopexit();
+ -- Quit now
+ closeallservers();
+ base:loopexit();
end
end
@@ -799,6 +798,20 @@ local function link(sender, receiver, buffersize)
sender:set_mode("*a");
end
+local function add_task(delay, callback)
+ local event_handle;
+ event_handle = base:addevent(nil, 0, function ()
+ local ret = callback(socket_gettime());
+ if ret then
+ return 0, ret;
+ elseif event_handle then
+ return -1;
+ end
+ end
+ , delay);
+ return event_handle;
+end
+
return {
cfg = cfg,
base = base,
@@ -814,6 +827,7 @@ return {
closeall = closeallservers,
get_backend = get_backend,
hook_signal = hook_signal,
+ add_task = add_task,
__NAME = SCRIPT_NAME,
__DATE = LAST_MODIFIED,
diff --git a/net/server_select.lua b/net/server_select.lua
index 52a0d5f1..37d57d29 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -31,17 +31,16 @@ local tostring = use "tostring"
--// lua libs //--
-local os = use "os"
local table = use "table"
local string = use "string"
local coroutine = use "coroutine"
--// lua lib methods //--
-local os_difftime = os.difftime
local math_min = math.min
local math_huge = math.huge
local table_concat = table.concat
+local table_insert = table.insert
local string_sub = string.sub
local coroutine_wrap = coroutine.wrap
local coroutine_yield = coroutine.yield
@@ -57,7 +56,6 @@ local getaddrinfo = luasocket.dns.getaddrinfo
local ssl_wrap = ( has_luasec and luasec.wrap )
local socket_bind = luasocket.bind
-local socket_sleep = luasocket.sleep
local socket_select = luasocket.select
--// functions //--
@@ -102,7 +100,6 @@ local _sendtraffic
local _readtraffic
local _selecttimeout
-local _sleeptime
local _tcpbacklog
local _accepretry
@@ -116,8 +113,6 @@ local _checkinterval
local _sendtimeout
local _readtimeout
-local _timer
-
local _maxselectlen
local _maxfd
@@ -143,7 +138,6 @@ _sendtraffic = 0 -- some stats
_readtraffic = 0
_selecttimeout = 1 -- timeout of socket.select
-_sleeptime = 0 -- time to wait at the end of every loop
_tcpbacklog = 128 -- some kind of hint to the OS
_accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
@@ -303,7 +297,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
local bufferqueuelen = 0 -- end of buffer array
local toclose
- local fatalerror
local needtls
local bufferlen = 0
@@ -518,7 +511,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
return dispatch( handler, buffer, err )
else -- connections was closed or fatal error
out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) )
- fatalerror = true
_ = handler and handler:force_close( err )
return false
end
@@ -558,7 +550,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
return true
else -- connection was closed during sending or fatal error
out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) )
- fatalerror = true
_ = handler and handler:force_close( err )
return false
end
@@ -807,7 +798,6 @@ end
getsettings = function( )
return {
select_timeout = _selecttimeout;
- select_sleep_time = _sleeptime;
tcp_backlog = _tcpbacklog;
max_send_buffer_size = _maxsendlen;
max_receive_buffer_size = _maxreadlen;
@@ -826,7 +816,6 @@ changesettings = function( new )
return nil, "invalid settings table"
end
_selecttimeout = tonumber( new.select_timeout ) or _selecttimeout
- _sleeptime = tonumber( new.select_sleep_time ) or _sleeptime
_maxsendlen = tonumber( new.max_send_buffer_size ) or _maxsendlen
_maxreadlen = tonumber( new.max_receive_buffer_size ) or _maxreadlen
_checkinterval = tonumber( new.select_idle_check_interval ) or _checkinterval
@@ -849,6 +838,49 @@ addtimer = function( listener )
return true
end
+local add_task do
+ local data = {};
+ local new_data = {};
+
+ function add_task(delay, callback)
+ local current_time = luasocket_gettime();
+ delay = delay + current_time;
+ if delay >= current_time then
+ table_insert(new_data, {delay, callback});
+ else
+ local r = callback(current_time);
+ if r and type(r) == "number" then
+ return add_task(r, callback);
+ end
+ end
+ end
+
+ addtimer(function(current_time)
+ if #new_data > 0 then
+ for _, d in pairs(new_data) do
+ table_insert(data, d);
+ end
+ new_data = {};
+ end
+
+ local next_time = math_huge;
+ for i, d in pairs(data) do
+ local t, callback = d[1], d[2];
+ if t <= current_time then
+ data[i] = nil;
+ local r = callback(current_time);
+ if type(r) == "number" then
+ add_task(r, callback);
+ next_time = math_min(next_time, r);
+ end
+ else
+ next_time = math_min(next_time, t - current_time);
+ end
+ end
+ return next_time;
+ end);
+end
+
stats = function( )
return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen
end
@@ -862,8 +894,15 @@ end
loop = function(once) -- this is the main loop of the program
if quitting then return "quitting"; end
if once then quitting = "once"; end
- local next_timer_time = math_huge;
+ _currenttime = luasocket_gettime( )
repeat
+ -- Fire timers
+ local next_timer_time = math_huge;
+ for i = 1, _timerlistlen do
+ local t = _timerlist[ i ]( _currenttime ) -- fire timers
+ if t then next_timer_time = math_min(next_timer_time, t); end
+ end
+
local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) )
for i, socket in ipairs( write ) do -- send data waiting in writequeues
local handler = _socketlist[ socket ]
@@ -891,17 +930,16 @@ loop = function(once) -- this is the main loop of the program
_currenttime = luasocket_gettime( )
-- Check for socket timeouts
- local difftime = os_difftime( _currenttime - _starttime )
- if difftime > _checkinterval then
+ if _currenttime - _starttime > _checkinterval then
_starttime = _currenttime
for handler, timestamp in pairs( _writetimes ) do
- if os_difftime( _currenttime - timestamp ) > _sendtimeout then
+ if _currenttime - timestamp > _sendtimeout then
handler.disconnect( )( handler, "send timeout" )
handler:force_close() -- forced disconnect
end
end
for handler, timestamp in pairs( _readtimes ) do
- if os_difftime( _currenttime - timestamp ) > _readtimeout then
+ if _currenttime - timestamp > _readtimeout then
if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then
handler.disconnect( )( handler, "read timeout" )
handler:close( ) -- forced disconnect?
@@ -912,27 +950,12 @@ loop = function(once) -- this is the main loop of the program
end
end
- -- Fire timers
- if _currenttime - _timer >= math_min(next_timer_time, 1) then
- next_timer_time = math_huge;
- for i = 1, _timerlistlen do
- local t = _timerlist[ i ]( _currenttime ) -- fire timers
- if t then next_timer_time = math_min(next_timer_time, t); end
- end
- _timer = _currenttime
- else
- next_timer_time = next_timer_time - (_currenttime - _timer);
- end
-
for server, paused_time in pairs( _fullservers ) do
if _currenttime - paused_time > _accepretry then
_fullservers[ server ] = nil;
server.resume();
end
end
-
- -- wait some time (0 by default)
- socket_sleep( _sleeptime )
until quitting;
if once and quitting == "once" then quitting = nil; return; end
closeall();
@@ -979,16 +1002,14 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
elseif sslctx and not has_luasec then
err = "luasec not found"
end
- if not typ then
+ if getaddrinfo and not typ then
local addrinfo, err = getaddrinfo(address)
if not addrinfo then return nil, err end
if addrinfo[1] and addrinfo[1].family == "inet6" then
typ = "tcp6"
- else
- typ = "tcp"
end
end
- local create = luasocket[typ]
+ local create = luasocket[typ or "tcp"]
if type( create ) ~= "function" then
err = "invalid socket type"
end
@@ -1004,22 +1025,19 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
end
client:settimeout( 0 )
local ok, err = client:connect( address, port )
- if ok or err == "timeout" then
+ if ok or err == "timeout" or err == "Operation already in progress" then
return wrapclient( client, address, port, listeners, pattern, sslctx )
else
return nil, err
end
end
---// EXPERIMENTAL //--
-
----------------------------------// BEGIN //--
use "setmetatable" ( _socketlist, { __mode = "k" } )
use "setmetatable" ( _readtimes, { __mode = "k" } )
use "setmetatable" ( _writetimes, { __mode = "k" } )
-_timer = luasocket_gettime( )
_starttime = luasocket_gettime( )
local function setlogger(new_logger)
@@ -1034,6 +1052,7 @@ end
return {
_addtimer = addtimer,
+ add_task = add_task;
addclient = addclient,
wrapclient = wrapclient,