aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_event.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_event.lua')
-rw-r--r--net/server_event.lua157
1 files changed, 80 insertions, 77 deletions
diff --git a/net/server_event.lua b/net/server_event.lua
index 25799640..53330997 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -115,10 +115,10 @@ end )( )
local interface_mt
do
interface_mt = {}; interface_mt.__index = interface_mt;
-
+
local addevent = base.addevent
local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
-
+
-- Private methods
function interface_mt:_position(new_position)
self.position = new_position or self.position
@@ -127,7 +127,7 @@ do
function interface_mt:_close()
return self:_destroy();
end
-
+
function interface_mt:_start_connection(plainssl) -- should be called from addclient
local callback = function( event )
if EV_TIMEOUT == event then -- timeout during connection
@@ -268,12 +268,12 @@ do
interfacelist( "delete", self )
return true
end
-
+
function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
return nointerface, noreading, nowriting
end
-
+
--TODO: Deprecate
function interface_mt:lock_read(switch)
if switch then
@@ -300,7 +300,7 @@ do
end
return self._connections
end
-
+
-- Public methods
function interface_mt:write(data)
if self.nowriting then return nil, "locked" end
@@ -343,27 +343,27 @@ do
return true
end
end
-
+
function interface_mt:socket()
return self.conn
end
-
+
function interface_mt:server()
return self._server or self;
end
-
+
function interface_mt:port()
return self._port
end
-
+
function interface_mt:serverport()
return self._serverport
end
-
+
function interface_mt:ip()
return self._ip
end
-
+
function interface_mt:ssl()
return self._usingssl
end
@@ -372,15 +372,15 @@ do
function interface_mt:type()
return self._type or "client"
end
-
+
function interface_mt:connections()
return self._connections
end
-
+
function interface_mt:address()
return self.addr
end
-
+
function interface_mt:set_sslctx(sslctx)
self._sslctx = sslctx;
if sslctx then
@@ -396,11 +396,11 @@ do
end
return self._pattern;
end
-
+
function interface_mt:set_send(new_send)
-- No-op, we always use the underlying connection's send
end
-
+
function interface_mt:starttls(sslctx, call_onconnect)
debug( "try to start ssl at client id:", self.id )
local err
@@ -429,19 +429,20 @@ do
self.starttls = false;
return true
end
-
+
function interface_mt:setoption(option, value)
if self.conn.setoption then
return self.conn:setoption(option, value);
end
return false, "setoption not implemented";
end
-
+
function interface_mt:setlistener(listener)
- self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus
- = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus;
+ self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus
+ = listener.onconnect, listener.ondisconnect, listener.onincoming,
+ listener.ontimeout, listener.onreadtimeout, listener.onstatus;
end
-
+
-- Stub handlers
function interface_mt:onconnect()
end
@@ -451,6 +452,12 @@ do
end
function interface_mt:ontimeout()
end
+ function interface_mt:onreadtimeout()
+ self.fatalerror = "timeout during receiving"
+ debug( "connection failed:", self.fatalerror )
+ self:_close()
+ self.eventread = nil
+ end
function interface_mt:ondrain()
end
function interface_mt:onstatus()
@@ -478,6 +485,7 @@ do
ondisconnect = listener.ondisconnect; -- will be called when client disconnects
onincoming = listener.onincoming; -- will be called when client sends data
ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
ondrain = listener.ondrain; -- called when writebuffer is empty
onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
eventread = false, eventwrite = false, eventclose = false,
@@ -492,7 +500,7 @@ do
noreading = false, nowriting = false; -- locks of the read/writecallback
startsslcallback = false; -- starting handshake callback
position = false; -- position of client in interfacelist
-
+
-- Properties
_ip = ip, _port = port, _server = server, _pattern = pattern,
_serverport = (server and server:port() or nil),
@@ -568,7 +576,7 @@ do
end
end
end
-
+
interface.readcallback = function( event ) -- called on read events
--vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
if interface.noreading or interface.fatalerror then -- leave this event
@@ -576,61 +584,56 @@ do
interface.eventread = nil
return -1
end
- if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect
- interface.fatalerror = "timeout during receiving"
- debug( "connection failed:", interface.fatalerror )
+ if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
+ return -1 -- took too long to get some data from client -> disconnect
+ end
+ if interface._usingssl then -- handle luasec
+ if interface.eventwritetimeout then -- ok, in the past writecallback was regged
+ local ret = interface.writecallback( ) -- call it
+ --vdebug( "tried to write in readcallback, result:", tostring(ret) )
+ end
+ if interface.eventreadtimeout then
+ interface.eventreadtimeout:close( )
+ interface.eventreadtimeout = nil
+ end
+ end
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
+ --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
+ buffer = buffer or part
+ if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
+ interface.fatalerror = "receive buffer exceeded"
+ debug( "fatal error:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
- else -- can read
- if interface._usingssl then -- handle luasec
- if interface.eventwritetimeout then -- ok, in the past writecallback was regged
- local ret = interface.writecallback( ) -- call it
- --vdebug( "tried to write in readcallback, result:", tostring(ret) )
- end
- if interface.eventreadtimeout then
- interface.eventreadtimeout:close( )
- interface.eventreadtimeout = nil
+ end
+ if err and ( err ~= "timeout" and err ~= "wantread" ) then
+ if "wantwrite" == err then -- need to read on write event
+ if not interface.eventwrite then -- register new write event if needed
+ interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
end
- end
- local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
- --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
- buffer = buffer or part
- if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
- interface.fatalerror = "receive buffer exceeded"
- debug( "fatal error:", interface.fatalerror )
+ interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
+ function( )
+ interface:_close()
+ end, cfg.READ_TIMEOUT
+ )
+ debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
+ -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
+ else -- connection was closed or fatal error
+ interface.fatalerror = err
+ debug( "connection failed in read event:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
end
- if err and ( err ~= "timeout" and err ~= "wantread" ) then
- if "wantwrite" == err then -- need to read on write event
- if not interface.eventwrite then -- register new write event if needed
- interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
- end
- interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
- function( )
- interface:_close()
- end, cfg.READ_TIMEOUT
- )
- debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
- -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
- else -- connection was closed or fatal error
- interface.fatalerror = err
- debug( "connection failed in read event:", interface.fatalerror )
- interface:_close()
- interface.eventread = nil
- return -1
- end
- else
- interface.onincoming( interface, buffer, err ) -- send new data to listener
- end
- if interface.noreading then
- interface.eventread = nil;
- return -1;
- end
- return EV_READ, cfg.READ_TIMEOUT
+ else
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
end
+ if interface.noreading then
+ interface.eventread = nil;
+ return -1;
+ end
+ return EV_READ, cfg.READ_TIMEOUT
end
client:settimeout( 0 ) -- set non blocking
@@ -646,7 +649,7 @@ do
debug "creating server interface..."
local interface = {
_connections = 0;
-
+
conn = server;
onconnect = listener.onconnect; -- will be called when new client connected
eventread = false; -- read event handler
@@ -654,7 +657,7 @@ do
readcallback = false; -- read event callback
fatalerror = false; -- error message
nointerface = true; -- lock/unlock parameter
-
+
_ip = addr, _port = port, _pattern = pattern,
_sslctx = sslctx;
}
@@ -693,12 +696,12 @@ do
clientinterface:_start_session( true )
end
debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
-
+
client, err = server:accept() -- try to accept again
end
return EV_READ
end
-
+
server:settimeout( 0 )
setmetatable(interface, interface_mt)
interfacelist( "add", interface )
@@ -741,7 +744,7 @@ do
return interface, client
--function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
end
-
+
function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
local client, err = socket.tcp() -- creating new socket
if not client then
@@ -832,14 +835,14 @@ end
local function link(sender, receiver, buffersize)
local sender_locked;
-
+
function receiver:ondrain()
if sender_locked then
sender:resume();
sender_locked = nil;
end
end
-
+
function sender:onincoming(data)
receiver:write(data);
if receiver.writebufferlen >= buffersize then