aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_select.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_select.lua')
-rw-r--r--net/server_select.lua154
1 files changed, 110 insertions, 44 deletions
diff --git a/net/server_select.lua b/net/server_select.lua
index 12aef9d8..3b83bb6d 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -40,6 +40,7 @@ local coroutine = use "coroutine"
local math_min = math.min
local math_huge = math.huge
local table_concat = table.concat
+local table_insert = table.insert
local string_sub = string.sub
local coroutine_wrap = coroutine.wrap
local coroutine_yield = coroutine.yield
@@ -55,7 +56,6 @@ local getaddrinfo = luasocket.dns.getaddrinfo
local ssl_wrap = ( has_luasec and luasec.wrap )
local socket_bind = luasocket.bind
-local socket_sleep = luasocket.sleep
local socket_select = luasocket.select
--// functions //--
@@ -100,7 +100,6 @@ local _sendtraffic
local _readtraffic
local _selecttimeout
-local _sleeptime
local _tcpbacklog
local _accepretry
@@ -114,8 +113,6 @@ local _checkinterval
local _sendtimeout
local _readtimeout
-local _timer
-
local _maxselectlen
local _maxfd
@@ -135,13 +132,12 @@ _fullservers = { } -- servers in a paused state while there are too many clients
_readlistlen = 0 -- length of readlist
_sendlistlen = 0 -- length of sendlist
-_timerlistlen = 0 -- lenght of timerlist
+_timerlistlen = 0 -- length of timerlist
_sendtraffic = 0 -- some stats
_readtraffic = 0
_selecttimeout = 1 -- timeout of socket.select
-_sleeptime = 0 -- time to wait at the end of every loop
_tcpbacklog = 128 -- some kind of hint to the OS
_accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
@@ -301,7 +297,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
local bufferqueuelen = 0 -- end of buffer array
local toclose
- local fatalerror
local needtls
local bufferlen = 0
@@ -425,7 +420,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
bufferlen = bufferlen + #data
if bufferlen > maxsendlen then
_closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle
- handler.write = idfalse -- dont write anymore
+ handler.write = idfalse -- don't write anymore
return false
elseif socket and not _sendlist[ socket ] then
_sendlistlen = addsocket(_sendlist, socket, _sendlistlen)
@@ -517,7 +512,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
return dispatch( handler, buffer, err )
else -- connections was closed or fatal error
out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) )
- fatalerror = true
_ = handler and handler:force_close( err )
return false
end
@@ -537,7 +531,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
else
succ, err, count = false, "unexpected close", 0;
end
- if succ then -- sending succesful
+ if succ then -- sending successful
bufferqueuelen = 0
bufferlen = 0
_sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist
@@ -557,7 +551,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
return true
else -- connection was closed during sending or fatal error
out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) )
- fatalerror = true
_ = handler and handler:force_close( err )
return false
end
@@ -806,7 +799,6 @@ end
getsettings = function( )
return {
select_timeout = _selecttimeout;
- select_sleep_time = _sleeptime;
tcp_backlog = _tcpbacklog;
max_send_buffer_size = _maxsendlen;
max_receive_buffer_size = _maxreadlen;
@@ -825,7 +817,6 @@ changesettings = function( new )
return nil, "invalid settings table"
end
_selecttimeout = tonumber( new.select_timeout ) or _selecttimeout
- _sleeptime = tonumber( new.select_sleep_time ) or _sleeptime
_maxsendlen = tonumber( new.max_send_buffer_size ) or _maxsendlen
_maxreadlen = tonumber( new.max_receive_buffer_size ) or _maxreadlen
_checkinterval = tonumber( new.select_idle_check_interval ) or _checkinterval
@@ -848,6 +839,49 @@ addtimer = function( listener )
return true
end
+local add_task do
+ local data = {};
+ local new_data = {};
+
+ function add_task(delay, callback)
+ local current_time = luasocket_gettime();
+ delay = delay + current_time;
+ if delay >= current_time then
+ table_insert(new_data, {delay, callback});
+ else
+ local r = callback(current_time);
+ if r and type(r) == "number" then
+ return add_task(r, callback);
+ end
+ end
+ end
+
+ addtimer(function(current_time)
+ if #new_data > 0 then
+ for _, d in pairs(new_data) do
+ table_insert(data, d);
+ end
+ new_data = {};
+ end
+
+ local next_time = math_huge;
+ for i, d in pairs(data) do
+ local t, callback = d[1], d[2];
+ if t <= current_time then
+ data[i] = nil;
+ local r = callback(current_time);
+ if type(r) == "number" then
+ add_task(r, callback);
+ next_time = math_min(next_time, r);
+ end
+ else
+ next_time = math_min(next_time, t - current_time);
+ end
+ end
+ return next_time;
+ end);
+end
+
stats = function( )
return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen
end
@@ -855,31 +889,38 @@ end
local quitting;
local function setquitting(quit)
- quitting = not not quit;
+ quitting = quit;
end
loop = function(once) -- this is the main loop of the program
if quitting then return "quitting"; end
if once then quitting = "once"; end
- local next_timer_time = math_huge;
+ _currenttime = luasocket_gettime( )
repeat
+ -- Fire timers
+ local next_timer_time = math_huge;
+ for i = 1, _timerlistlen do
+ local t = _timerlist[ i ]( _currenttime ) -- fire timers
+ if t then next_timer_time = math_min(next_timer_time, t); end
+ end
+
local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) )
- for _, socket in ipairs( write ) do -- send data waiting in writequeues
+ for _, socket in ipairs( read ) do -- receive data
local handler = _socketlist[ socket ]
if handler then
- handler.sendbuffer( )
+ handler.readbuffer( )
else
closesocket( socket )
- out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen
+ out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen
end
end
- for _, socket in ipairs( read ) do -- receive data
+ for _, socket in ipairs( write ) do -- send data waiting in writequeues
local handler = _socketlist[ socket ]
if handler then
- handler.readbuffer( )
+ handler.sendbuffer( )
else
closesocket( socket )
- out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen
+ out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen
end
end
for handler, err in pairs( _closelist ) do
@@ -910,29 +951,14 @@ loop = function(once) -- this is the main loop of the program
end
end
- -- Fire timers
- if _currenttime - _timer >= math_min(next_timer_time, 1) then
- next_timer_time = math_huge;
- for i = 1, _timerlistlen do
- local t = _timerlist[ i ]( _currenttime ) -- fire timers
- if t then next_timer_time = math_min(next_timer_time, t); end
- end
- _timer = _currenttime
- else
- next_timer_time = next_timer_time - (_currenttime - _timer);
- end
-
for server, paused_time in pairs( _fullservers ) do
if _currenttime - paused_time > _accepretry then
_fullservers[ server ] = nil;
server.resume();
end
end
-
- -- wait some time (0 by default)
- socket_sleep( _sleeptime )
until quitting;
- if once and quitting == "once" then quitting = nil; return; end
+ if quitting == "once" then quitting = nil; return; end
closeall();
return "quitting"
end
@@ -952,6 +978,7 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx
if not handler then return nil, err end
_socketlist[ socket ] = handler
if not sslctx then
+ _readlistlen = addsocket(_readlist, socket, _readlistlen)
_sendlistlen = addsocket(_sendlist, socket, _sendlistlen)
if listeners.onconnect then
-- When socket is writeable, call onconnect
@@ -977,16 +1004,14 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
elseif sslctx and not has_luasec then
err = "luasec not found"
end
- if not typ then
+ if getaddrinfo and not typ then
local addrinfo, err = getaddrinfo(address)
if not addrinfo then return nil, err end
if addrinfo[1] and addrinfo[1].family == "inet6" then
typ = "tcp6"
- else
- typ = "tcp"
end
end
- local create = luasocket[typ]
+ local create = luasocket[typ or "tcp"]
if type( create ) ~= "function" then
err = "invalid socket type"
end
@@ -1002,14 +1027,54 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
end
client:settimeout( 0 )
local ok, err = client:connect( address, port )
- if ok or err == "timeout" then
+ if ok or err == "timeout" or err == "Operation already in progress" then
return wrapclient( client, address, port, listeners, pattern, sslctx )
else
return nil, err
end
end
---// EXPERIMENTAL //--
+local closewatcher = function (handler)
+ local socket = handler.conn;
+ _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+ _readlistlen = removesocket( _readlist, socket, _readlistlen )
+ _socketlist[ socket ] = nil
+end;
+
+local addremove = function (handler, read, send)
+ local socket = handler.conn
+ _socketlist[ socket ] = handler
+ if read ~= nil then
+ if read then
+ _readlistlen = addsocket( _readlist, socket, _readlistlen )
+ else
+ _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+ end
+ end
+ if send ~= nil then
+ if send then
+ _sendlistlen = addsocket( _sendlist, socket, _sendlistlen )
+ else
+ _readlistlen = removesocket( _readlist, socket, _readlistlen )
+ end
+ end
+end
+
+local watchfd = function ( fd, onreadable, onwriteable )
+ local socket = fd
+ if type(fd) == "number" then
+ socket = { getfd = function () return fd; end }
+ end
+ local handler = {
+ conn = socket;
+ readbuffer = onreadable or id;
+ sendbuffer = onwriteable or id;
+ close = closewatcher;
+ setflags = addremove;
+ };
+ addremove( handler, onreadable, onwriteable )
+ return handler
+end
----------------------------------// BEGIN //--
@@ -1017,7 +1082,6 @@ use "setmetatable" ( _socketlist, { __mode = "k" } )
use "setmetatable" ( _readtimes, { __mode = "k" } )
use "setmetatable" ( _writetimes, { __mode = "k" } )
-_timer = luasocket_gettime( )
_starttime = luasocket_gettime( )
local function setlogger(new_logger)
@@ -1032,9 +1096,11 @@ end
return {
_addtimer = addtimer,
+ add_task = add_task;
addclient = addclient,
wrapclient = wrapclient,
+ watchfd = watchfd,
loop = loop,
link = link,