aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_epoll.lua
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2019-06-10 13:22:22 +0200
committerKim Alvefur <zash@zash.se>2019-06-10 13:22:22 +0200
commit5eba0ea836d508aeb3087a7d93f6e90cc1f92de8 (patch)
tree8cd84a76fb673cbfefa00ba1516bd16150f1c780 /net/server_epoll.lua
parent1cc4ec5d808a55d32e3d51f30d8b50fe45268438 (diff)
parent2bb05d010d9b237a088bd9b4c997451407191d3f (diff)
downloadprosody-5eba0ea836d508aeb3087a7d93f6e90cc1f92de8.tar.gz
prosody-5eba0ea836d508aeb3087a7d93f6e90cc1f92de8.zip
Merge 0.11->trunk
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r--net/server_epoll.lua139
1 files changed, 79 insertions, 60 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index c279c579..5f62d931 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -6,9 +6,7 @@
--
-local t_sort = table.sort;
local t_insert = table.insert;
-local t_remove = table.remove;
local t_concat = table.concat;
local setmetatable = setmetatable;
local tostring = tostring;
@@ -20,6 +18,7 @@ local log = require "util.logger".init("server_epoll");
local socket = require "socket";
local luasec = require "ssl";
local gettime = require "util.time".now;
+local indexedbheap = require "util.indexedbheap";
local createtable = require "util.table".create;
local inet = require "util.net";
local inet_pton = inet.pton;
@@ -39,7 +38,10 @@ local default_config = { __index = {
read_timeout = 14 * 60;
-- How long to wait for a socket to become writable after queuing data to send
- send_timeout = 60;
+ send_timeout = 180;
+
+ -- How long to wait for a socket to become writable after creation
+ connect_timeout = 20;
-- Some number possibly influencing how many pending connections can be accepted
tcp_backlog = 128;
@@ -66,22 +68,24 @@ local fds = createtable(10, 0); -- FD -> conn
-- Timer and scheduling --
-local timers = {};
+local timers = indexedbheap.create();
local function noop() end
local function closetimer(t)
t[1] = 0;
t[2] = noop;
+ timers:remove(t.id);
end
--- Set to true when timers have changed
-local resort_timers = false;
+local function reschedule(t, time)
+ t[1] = time;
+ timers:reprioritize(t.id, time);
+end
-- Add absolute timer
local function at(time, f)
- local timer = { time, f, close = closetimer };
- t_insert(timers, timer);
- resort_timers = true;
+ local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
+ timer.id = timers:insert(timer, time);
return timer;
end
@@ -94,50 +98,32 @@ end
-- Return time until next timeout
local function runtimers(next_delay, min_wait)
-- Any timers at all?
- if not timers[1] then
- return next_delay;
- end
-
- if resort_timers then
- -- Sort earliest timers to the end
- t_sort(timers, function (a, b) return a[1] > b[1]; end);
- resort_timers = false;
- end
+ local now = gettime();
+ local peek = timers:peek();
+ while peek do
- -- Iterate from the end and remove completed timers
- for i = #timers, 1, -1 do
- local timer = timers[i];
- local t, f = timer[1], timer[2];
- -- Get time for every iteration to increase accuracy
- local now = gettime();
- if t > now then
- -- This timer should not fire yet
- local diff = t - now;
- if diff < next_delay then
- next_delay = diff;
- end
+ if peek > now then
+ next_delay = peek - now;
break;
end
- local new_timeout = f(now);
- if new_timeout then
- -- Schedule for 'delay' from the time actually scheduled,
- -- not from now, in order to prevent timer drift.
- timer[1] = t + new_timeout;
- resort_timers = true;
- else
- t_remove(timers, i);
+
+ local _, timer, id = timers:pop();
+ local ok, ret = pcall(timer[2], now);
+ if ok and type(ret) == "number" then
+ local next_time = now+ret;
+ timer[1] = next_time;
+ timers:insert(timer, next_time);
end
- end
- if resort_timers or next_delay < min_wait then
- -- Timers may be added from within a timer callback.
- -- Those would not be considered for next_delay,
- -- and we might sleep for too long, so instead
- -- we return a shorter timeout so we can
- -- properly sort all new timers.
- next_delay = min_wait;
+ peek = timers:peek();
+ end
+ if peek == nil then
+ return next_delay;
end
+ if next_delay < min_wait then
+ return min_wait;
+ end
return next_delay;
end
@@ -176,6 +162,7 @@ function interface:on(what, ...)
local ok, err = pcall(listener, self, ...);
if not ok then
log("error", "Error calling on%s: %s", what, err);
+ return;
end
return err;
end
@@ -243,8 +230,7 @@ function interface:setreadtimeout(t)
end
t = t or cfg.read_timeout;
if self._readtimeout then
- self._readtimeout[1] = gettime() + t;
- resort_timers = true;
+ self._readtimeout:reschedule(gettime() + t);
else
self._readtimeout = addtimer(t, function ()
if self:on("readtimeout") then
@@ -268,8 +254,7 @@ function interface:setwritetimeout(t)
end
t = t or cfg.send_timeout;
if self._writetimeout then
- self._writetimeout[1] = gettime() + t;
- resort_timers = true;
+ self._writetimeout:reschedule(gettime() + t);
else
self._writetimeout = addtimer(t, function ()
self:on("disconnect", "write timeout");
@@ -426,8 +411,10 @@ function interface:write(data)
else
self.writebuffer = { data };
end
- self:setwritetimeout();
- self:set(nil, true);
+ if not self._write_lock then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
return #data;
end
interface.send = interface.write;
@@ -502,6 +489,13 @@ function interface:tlshandskake()
end
conn:settimeout(0);
self.conn = conn;
+ if conn.sni then
+ if self.servername then
+ conn:sni(self.servername);
+ elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
+ conn:sni(self._server.hosts, true);
+ end
+ end
self:on("starttls");
self.ondrain = nil;
self.onwritable = interface.tlshandskake;
@@ -574,12 +568,14 @@ function interface:onacceptable()
client:init();
if self.tls_direct then
client:starttls(self.tls_ctx);
+ else
+ client:onconnect();
end
end
-- Initialization
function interface:init()
- self:setwritetimeout();
+ self:setwritetimeout(cfg.connect_timeout);
return self:add(true, true);
end
@@ -607,16 +603,28 @@ function interface:pausefor(t)
end);
end
+function interface:pause_writes()
+ self._write_lock = true;
+ self:setwritetimeout(false);
+ self:set(nil, false);
+end
+
+function interface:resume_writes()
+ self._write_lock = nil;
+ if self.writebuffer[1] then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
+end
+
-- Connected!
function interface:onconnect()
- if self.conn and not self.peername and self.conn.getpeername then
- self.peername, self.peerport = self.conn:getpeername();
- end
+ self:updatenames();
self.onconnect = noop;
self:on("connect");
end
-local function addserver(addr, port, listeners, read_size, tls_ctx)
+local function listen(addr, port, listeners, config)
local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
if not conn then return conn, err; end
conn:settimeout(0);
@@ -624,10 +632,11 @@ local function addserver(addr, port, listeners, read_size, tls_ctx)
conn = conn;
created = gettime();
listeners = listeners;
- read_size = read_size;
+ read_size = config and config.read_size;
onreadable = interface.onacceptable;
- tls_ctx = tls_ctx;
- tls_direct = tls_ctx and true or false;
+ tls_ctx = config and config.tls_ctx;
+ tls_direct = config and config.tls_direct;
+ hosts = config and config.sni_hosts;
sockname = addr;
sockport = port;
}, interface_mt);
@@ -636,6 +645,15 @@ local function addserver(addr, port, listeners, read_size, tls_ctx)
end
-- COMPAT
+local function addserver(addr, port, listeners, read_size, tls_ctx)
+ return listen(addr, port, listeners, {
+ read_size = read_size;
+ tls_ctx = tls_ctx;
+ tls_direct = tls_ctx and true or false;
+ });
+end
+
+-- COMPAT
local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx)
local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx);
if not client.peername then
@@ -771,6 +789,7 @@ return {
addserver = addserver;
addclient = addclient;
add_task = addtimer;
+ listen = listen;
at = at;
loop = loop;
closeall = closeall;