aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_event.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_event.lua')
-rw-r--r--net/server_event.lua104
1 files changed, 81 insertions, 23 deletions
diff --git a/net/server_event.lua b/net/server_event.lua
index 3a907349..11bd6a29 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -5,9 +5,9 @@
notes:
-- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE
- -- you cant even register a new EV_READ/EV_WRITE callback inside another one
+ -- you can't even register a new EV_READ/EV_WRITE callback inside another one
-- to do some of the above, use timeout events or something what will called from outside
- -- dont let garbagecollect eventcallbacks, as long they are running
+ -- don't let garbagecollect eventcallbacks, as long they are running
-- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing
--]]
@@ -26,7 +26,7 @@ local cfg = {
MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets)
ACCEPT_QUEUE = 128, -- might influence the length of the pending sockets queue
ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept
- READ_TIMEOUT = 60 * 60 * 6, -- timeout in seconds for read data from socket
+ READ_TIMEOUT = 14 * 60, -- timeout in seconds for read data from socket
WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket
CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts
CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
@@ -50,9 +50,10 @@ local coroutine_yield = coroutine.yield
local has_luasec, ssl = pcall ( require , "ssl" )
local socket = require "socket"
local levent = require "luaevent.core"
+local inet = require "util.net";
+local inet_pton = inet.pton;
local socket_gettime = socket.gettime
-local getaddrinfo = socket.dns.getaddrinfo
local log = require ("util.logger").init("socket")
@@ -106,6 +107,12 @@ function interface_mt:_start_connection(plainssl) -- called from wrapclient
self:_close()
debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
else
+ if EV_READWRITE == event then
+ if self.readcallback(event) == -1 then
+ -- Fatal error occurred
+ return -1;
+ end
+ end
if plainssl and has_luasec then -- start ssl session
self:starttls(self._sslctx, true)
else -- normal connection
@@ -116,7 +123,7 @@ function interface_mt:_start_connection(plainssl) -- called from wrapclient
self.eventconnect = nil
return -1
end
- self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
+ self.eventconnect = addevent( base, self.conn, EV_READWRITE, callback, cfg.CONNECT_TIMEOUT )
return true
end
function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
@@ -151,7 +158,7 @@ function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed
self.fatalerror = err
self.conn = nil -- cannot be used anymore
if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
+ self.ondisconnect = nil -- don't call this when client isn't really connected
end
self:_close()
debug( "fatal error while ssl wrapping:", err )
@@ -194,7 +201,7 @@ function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed
end
if self.fatalerror then
if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
+ self.ondisconnect = nil -- don't call this when client isn't really connected
end
self:_close()
debug( "handshake failed because:", self.fatalerror )
@@ -223,7 +230,8 @@ function interface_mt:_destroy() -- close this interface + events and call last
_ = self.eventsession and self.eventsession:close( )
_ = self.eventwritetimeout and self.eventwritetimeout:close( )
_ = self.eventreadtimeout and self.eventreadtimeout:close( )
- _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
+ -- call ondisconnect listener (won't be the case if handshake failed on connect)
+ _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror)
_ = self.conn and self.conn:close( ) -- close connection
_ = self._server and self._server:counter(-1);
self.eventread, self.eventwrite = nil, nil
@@ -408,7 +416,7 @@ function interface_mt:setoption(option, value)
return false, "setoption not implemented";
end
-function interface_mt:setlistener(listener)
+function interface_mt:setlistener(listener, data)
self:ondetach(); -- Notify listener that it is no longer responsible for this connection
self.onconnect = listener.onconnect;
self.ondisconnect = listener.ondisconnect;
@@ -417,7 +425,9 @@ function interface_mt:setlistener(listener)
self.onreadtimeout = listener.onreadtimeout;
self.onstatus = listener.onstatus;
self.ondetach = listener.ondetach;
+ self.onattach = listener.onattach;
self.ondrain = listener.ondrain;
+ self:onattach(data);
end
-- Stub handlers
@@ -439,6 +449,8 @@ function interface_mt:ondrain()
end
function interface_mt:ondetach()
end
+function interface_mt:onattach()
+end
function interface_mt:onstatus()
end
@@ -510,7 +522,7 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
interface.writebuffer = { t_concat(interface.writebuffer) }
local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
--vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
- if succ then -- writing succesful
+ if succ then -- writing successful
interface.writebuffer[1] = nil
interface.writebufferlen = 0
interface:ondrain();
@@ -539,7 +551,7 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
return -1;
end
interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
- debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
+ debug( "wantread during write attempt, reg it in readcallback but don't know what really happens next..." )
-- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
return -1
end
@@ -595,8 +607,8 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
end
interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
function( ) interface:_close() end, cfg.READ_TIMEOUT)
- debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
- -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
+ debug( "wantwrite during read attempt, reg it in writecallback but don't know what really happens next..." )
+ -- to be honest i don't know what happens next, if it is allowed to first read, the write etc...
else -- connection was closed or fatal error
interface.fatalerror = err
debug( "connection failed in read event:", interface.fatalerror )
@@ -717,15 +729,15 @@ local function addclient( addr, serverport, listener, pattern, sslctx, typ )
return nil, "luasec not found"
end
if not typ then
- local addrinfo, err = getaddrinfo(addr)
- if not addrinfo then return nil, err end
- if addrinfo[1] and addrinfo[1].family == "inet6" then
- typ = "tcp6"
- else
- typ = "tcp"
+ local n = inet_pton(addr);
+ if not n then return nil, "invalid-ip"; end
+ if #n == 16 then
+ typ = "tcp6";
+ elseif #n == 4 then
+ typ = "tcp4";
end
end
- local create = socket[typ]
+ local create = socket[typ];
if type( create ) ~= "function" then
return nil, "invalid socket type"
end
@@ -735,7 +747,7 @@ local function addclient( addr, serverport, listener, pattern, sslctx, typ )
return nil, err
end
client:settimeout( 0 ) -- set nonblocking
- local res, err = client:connect( addr, serverport ) -- connect
+ local res, err = client:setpeername( addr, serverport ) -- connect
if res or ( err == "timeout" ) then
local ip, port = client:getsockname( )
local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx )
@@ -767,13 +779,15 @@ end
local function setquitting(yes)
if yes then
-- Quit now
- closeallservers();
+ if yes ~= "once" then
+ closeallservers();
+ end
base:loopexit();
end
end
local function get_backend()
- return base:method();
+ return "libevent " .. base:method();
end
-- We need to hold onto the events to stop them
@@ -811,6 +825,48 @@ local function link(sender, receiver, buffersize)
sender:set_mode("*a");
end
+local function add_task(delay, callback)
+ local event_handle;
+ event_handle = base:addevent(nil, 0, function ()
+ local ret = callback(socket_gettime());
+ if ret then
+ return 0, ret;
+ elseif event_handle then
+ return -1;
+ end
+ end
+ , delay);
+ return event_handle;
+end
+
+local function watchfd(fd, onreadable, onwriteable)
+ local handle = {};
+ function handle:setflags(r,w)
+ if r ~= nil then
+ if r and not self.wantread then
+ self.wantread = base:addevent(fd, EV_READ, function ()
+ onreadable(self);
+ end);
+ elseif not r and self.wantread then
+ self.wantread:close();
+ self.wantread = nil;
+ end
+ end
+ if w ~= nil then
+ if w and not self.wantwrite then
+ self.wantwrite = base:addevent(fd, EV_WRITE, function ()
+ onwriteable(self);
+ end);
+ elseif not r and self.wantread then
+ self.wantwrite:close();
+ self.wantwrite = nil;
+ end
+ end
+ end
+ handle:setflags(onreadable, onwriteable);
+ return handle;
+end
+
return {
cfg = cfg,
base = base,
@@ -826,6 +882,8 @@ return {
closeall = closeallservers,
get_backend = get_backend,
hook_signal = hook_signal,
+ add_task = add_task,
+ watchfd = watchfd,
__NAME = SCRIPT_NAME,
__DATE = LAST_MODIFIED,