diff options
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r-- | net/server_epoll.lua | 139 |
1 files changed, 79 insertions, 60 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua index c279c579..5f62d931 100644 --- a/net/server_epoll.lua +++ b/net/server_epoll.lua @@ -6,9 +6,7 @@ -- -local t_sort = table.sort; local t_insert = table.insert; -local t_remove = table.remove; local t_concat = table.concat; local setmetatable = setmetatable; local tostring = tostring; @@ -20,6 +18,7 @@ local log = require "util.logger".init("server_epoll"); local socket = require "socket"; local luasec = require "ssl"; local gettime = require "util.time".now; +local indexedbheap = require "util.indexedbheap"; local createtable = require "util.table".create; local inet = require "util.net"; local inet_pton = inet.pton; @@ -39,7 +38,10 @@ local default_config = { __index = { read_timeout = 14 * 60; -- How long to wait for a socket to become writable after queuing data to send - send_timeout = 60; + send_timeout = 180; + + -- How long to wait for a socket to become writable after creation + connect_timeout = 20; -- Some number possibly influencing how many pending connections can be accepted tcp_backlog = 128; @@ -66,22 +68,24 @@ local fds = createtable(10, 0); -- FD -> conn -- Timer and scheduling -- -local timers = {}; +local timers = indexedbheap.create(); local function noop() end local function closetimer(t) t[1] = 0; t[2] = noop; + timers:remove(t.id); end --- Set to true when timers have changed -local resort_timers = false; +local function reschedule(t, time) + t[1] = time; + timers:reprioritize(t.id, time); +end -- Add absolute timer local function at(time, f) - local timer = { time, f, close = closetimer }; - t_insert(timers, timer); - resort_timers = true; + local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; + timer.id = timers:insert(timer, time); return timer; end @@ -94,50 +98,32 @@ end -- Return time until next timeout local function runtimers(next_delay, min_wait) -- Any timers at all? - if not timers[1] then - return next_delay; - end - - if resort_timers then - -- Sort earliest timers to the end - t_sort(timers, function (a, b) return a[1] > b[1]; end); - resort_timers = false; - end + local now = gettime(); + local peek = timers:peek(); + while peek do - -- Iterate from the end and remove completed timers - for i = #timers, 1, -1 do - local timer = timers[i]; - local t, f = timer[1], timer[2]; - -- Get time for every iteration to increase accuracy - local now = gettime(); - if t > now then - -- This timer should not fire yet - local diff = t - now; - if diff < next_delay then - next_delay = diff; - end + if peek > now then + next_delay = peek - now; break; end - local new_timeout = f(now); - if new_timeout then - -- Schedule for 'delay' from the time actually scheduled, - -- not from now, in order to prevent timer drift. - timer[1] = t + new_timeout; - resort_timers = true; - else - t_remove(timers, i); + + local _, timer, id = timers:pop(); + local ok, ret = pcall(timer[2], now); + if ok and type(ret) == "number" then + local next_time = now+ret; + timer[1] = next_time; + timers:insert(timer, next_time); end - end - if resort_timers or next_delay < min_wait then - -- Timers may be added from within a timer callback. - -- Those would not be considered for next_delay, - -- and we might sleep for too long, so instead - -- we return a shorter timeout so we can - -- properly sort all new timers. - next_delay = min_wait; + peek = timers:peek(); + end + if peek == nil then + return next_delay; end + if next_delay < min_wait then + return min_wait; + end return next_delay; end @@ -176,6 +162,7 @@ function interface:on(what, ...) local ok, err = pcall(listener, self, ...); if not ok then log("error", "Error calling on%s: %s", what, err); + return; end return err; end @@ -243,8 +230,7 @@ function interface:setreadtimeout(t) end t = t or cfg.read_timeout; if self._readtimeout then - self._readtimeout[1] = gettime() + t; - resort_timers = true; + self._readtimeout:reschedule(gettime() + t); else self._readtimeout = addtimer(t, function () if self:on("readtimeout") then @@ -268,8 +254,7 @@ function interface:setwritetimeout(t) end t = t or cfg.send_timeout; if self._writetimeout then - self._writetimeout[1] = gettime() + t; - resort_timers = true; + self._writetimeout:reschedule(gettime() + t); else self._writetimeout = addtimer(t, function () self:on("disconnect", "write timeout"); @@ -426,8 +411,10 @@ function interface:write(data) else self.writebuffer = { data }; end - self:setwritetimeout(); - self:set(nil, true); + if not self._write_lock then + self:setwritetimeout(); + self:set(nil, true); + end return #data; end interface.send = interface.write; @@ -502,6 +489,13 @@ function interface:tlshandskake() end conn:settimeout(0); self.conn = conn; + if conn.sni then + if self.servername then + conn:sni(self.servername); + elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then + conn:sni(self._server.hosts, true); + end + end self:on("starttls"); self.ondrain = nil; self.onwritable = interface.tlshandskake; @@ -574,12 +568,14 @@ function interface:onacceptable() client:init(); if self.tls_direct then client:starttls(self.tls_ctx); + else + client:onconnect(); end end -- Initialization function interface:init() - self:setwritetimeout(); + self:setwritetimeout(cfg.connect_timeout); return self:add(true, true); end @@ -607,16 +603,28 @@ function interface:pausefor(t) end); end +function interface:pause_writes() + self._write_lock = true; + self:setwritetimeout(false); + self:set(nil, false); +end + +function interface:resume_writes() + self._write_lock = nil; + if self.writebuffer[1] then + self:setwritetimeout(); + self:set(nil, true); + end +end + -- Connected! function interface:onconnect() - if self.conn and not self.peername and self.conn.getpeername then - self.peername, self.peerport = self.conn:getpeername(); - end + self:updatenames(); self.onconnect = noop; self:on("connect"); end -local function addserver(addr, port, listeners, read_size, tls_ctx) +local function listen(addr, port, listeners, config) local conn, err = socket.bind(addr, port, cfg.tcp_backlog); if not conn then return conn, err; end conn:settimeout(0); @@ -624,10 +632,11 @@ local function addserver(addr, port, listeners, read_size, tls_ctx) conn = conn; created = gettime(); listeners = listeners; - read_size = read_size; + read_size = config and config.read_size; onreadable = interface.onacceptable; - tls_ctx = tls_ctx; - tls_direct = tls_ctx and true or false; + tls_ctx = config and config.tls_ctx; + tls_direct = config and config.tls_direct; + hosts = config and config.sni_hosts; sockname = addr; sockport = port; }, interface_mt); @@ -636,6 +645,15 @@ local function addserver(addr, port, listeners, read_size, tls_ctx) end -- COMPAT +local function addserver(addr, port, listeners, read_size, tls_ctx) + return listen(addr, port, listeners, { + read_size = read_size; + tls_ctx = tls_ctx; + tls_direct = tls_ctx and true or false; + }); +end + +-- COMPAT local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx) local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx); if not client.peername then @@ -771,6 +789,7 @@ return { addserver = addserver; addclient = addclient; add_task = addtimer; + listen = listen; at = at; loop = loop; closeall = closeall; |