aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_epoll.lua
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2019-12-24 00:39:45 +0100
committerKim Alvefur <zash@zash.se>2019-12-24 00:39:45 +0100
commit1eabf5bdb449b0312dfa272884e02cc84175775c (patch)
tree94b4ebcb9443e8e6751e1903ade3c9dc8ec9d678 /net/server_epoll.lua
parent575cd4c57d062a1addc761aa0036b1db752900a9 (diff)
parent8537138d41a6bca5e855040c8928ef2aae59802f (diff)
downloadprosody-1eabf5bdb449b0312dfa272884e02cc84175775c.tar.gz
prosody-1eabf5bdb449b0312dfa272884e02cc84175775c.zip
Merge 0.11->trunk
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r--net/server_epoll.lua280
1 files changed, 204 insertions, 76 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index 0c03ae15..e2c5f977 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -9,20 +9,22 @@
local t_insert = table.insert;
local t_concat = table.concat;
local setmetatable = setmetatable;
-local tostring = tostring;
local pcall = pcall;
local type = type;
local next = next;
local pairs = pairs;
-local log = require "util.logger".init("server_epoll");
+local logger = require "util.logger";
+local log = logger.init("server_epoll");
local socket = require "socket";
local luasec = require "ssl";
-local gettime = require "util.time".now;
+local realtime = require "util.time".now;
+local monotonic = require "util.time".monotonic;
local indexedbheap = require "util.indexedbheap";
local createtable = require "util.table".create;
local inet = require "util.net";
local inet_pton = inet.pton;
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
+local new_id = require "util.id".medium;
local poller = require "util.poll"
local EEXIST = poller.EEXIST;
@@ -38,7 +40,10 @@ local default_config = { __index = {
read_timeout = 14 * 60;
-- How long to wait for a socket to become writable after queuing data to send
- send_timeout = 60;
+ send_timeout = 180;
+
+ -- How long to wait for a socket to become writable after creation
+ connect_timeout = 20;
-- Some number possibly influencing how many pending connections can be accepted
tcp_backlog = 128;
@@ -58,6 +63,13 @@ local default_config = { __index = {
-- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers)
max_wait = 86400;
min_wait = 1e-06;
+
+ -- EXPERIMENTAL
+ -- Whether to kill connections in case of callback errors.
+ fatal_errors = false;
+
+ -- Attempt writes instantly
+ opportunistic_writes = false;
}};
local cfg = default_config.__index;
@@ -75,45 +87,43 @@ local function closetimer(t)
end
local function reschedule(t, time)
+ time = monotonic() + time;
t[1] = time;
timers:reprioritize(t.id, time);
end
--- Add absolute timer
-local function at(time, f)
+-- Add relative timer
+local function addtimer(timeout, f)
+ local time = monotonic() + timeout;
local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
timer.id = timers:insert(timer, time);
return timer;
end
--- Add relative timer
-local function addtimer(timeout, f)
- return at(gettime() + timeout, f);
-end
-
-- Run callbacks of expired timers
-- Return time until next timeout
local function runtimers(next_delay, min_wait)
-- Any timers at all?
- local now = gettime();
+ local elapsed = monotonic();
+ local now = realtime();
local peek = timers:peek();
while peek do
- if peek > now then
- next_delay = peek - now;
+ if peek > elapsed then
+ next_delay = peek - elapsed;
break;
- end
+ end
- local _, timer, id = timers:pop();
+ local _, timer = timers:pop();
local ok, ret = pcall(timer[2], now);
if ok and type(ret) == "number" then
- local next_time = now+ret;
+ local next_time = elapsed+ret;
timer[1] = next_time;
timers:insert(timer, next_time);
- end
+ end
peek = timers:peek();
- end
+ end
if peek == nil then
return next_delay;
end
@@ -138,6 +148,15 @@ function interface_mt:__tostring()
return ("FD %d"):format(self:getfd());
end
+interface.log = log;
+function interface:debug(msg, ...) --luacheck: ignore 212/self
+ self.log("debug", msg, ...);
+end
+
+function interface:error(msg, ...) --luacheck: ignore 212/self
+ self.log("error", msg, ...);
+end
+
-- Replace the listener and tell the old one
function interface:setlistener(listeners, data)
self:on("detach");
@@ -148,21 +167,32 @@ end
-- Call a listener callback
function interface:on(what, ...)
if not self.listeners then
- log("error", "%s has no listeners", self);
+ self:debug("Interface is missing listener callbacks");
return;
end
local listener = self.listeners["on"..what];
if not listener then
- -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
+ -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging
return;
end
local ok, err = pcall(listener, self, ...);
if not ok then
- log("error", "Error calling on%s: %s", what, err);
+ if cfg.fatal_errors then
+ self:debug("Closing due to error calling on%s: %s", what, err);
+ self:destroy();
+ else
+ self:debug("Error calling on%s: %s", what, err);
+ end
+ return nil, err;
end
return err;
end
+-- Allow this one to be overridden
+function interface:onincoming(...)
+ return self:on("incoming", ...);
+end
+
-- Return the file descriptor number
function interface:getfd()
if self.conn then
@@ -226,12 +256,14 @@ function interface:setreadtimeout(t)
end
t = t or cfg.read_timeout;
if self._readtimeout then
- self._readtimeout:reschedule(gettime() + t);
+ self._readtimeout:reschedule(t);
else
self._readtimeout = addtimer(t, function ()
if self:on("readtimeout") then
+ self:debug("Read timeout handled");
return cfg.read_timeout;
else
+ self:debug("Read timeout not handled, disconnecting");
self:on("disconnect", "read timeout");
self:destroy();
end
@@ -250,9 +282,10 @@ function interface:setwritetimeout(t)
end
t = t or cfg.send_timeout;
if self._writetimeout then
- self._writetimeout:reschedule(gettime() + t);
+ self._writetimeout:reschedule(t);
else
self._writetimeout = addtimer(t, function ()
+ self:debug("Write timeout");
self:on("disconnect", "write timeout");
self:destroy();
end);
@@ -269,15 +302,15 @@ function interface:add(r, w)
local ok, err, errno = poll:add(fd, r, w);
if not ok then
if errno == EEXIST then
- log("debug", "%s already registered!", self);
+ self:debug("FD already registered in poller! (EEXIST)");
return self:set(r, w); -- So try to change its flags
end
- log("error", "Could not register %s: %s(%d)", self, err, errno);
+ self:debug("Could not register in poller: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
fds[fd] = self;
- log("debug", "Watching %s", self);
+ self:debug("Registered in poller");
return true;
end
@@ -290,7 +323,7 @@ function interface:set(r, w)
if w == nil then w = self._wantwrite; end
local ok, err, errno = poll:set(fd, r, w);
if not ok then
- log("error", "Could not update poller state %s: %s(%d)", self, err, errno);
+ self:debug("Could not update poller state: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
@@ -307,12 +340,12 @@ function interface:del()
end
local ok, err, errno = poll:del(fd);
if not ok and errno ~= ENOENT then
- log("error", "Could not unregister %s: %s(%d)", self, err, errno);
+ self:debug("Could not unregister: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = nil, nil;
fds[fd] = nil;
- log("debug", "Unwatched %s", self);
+ self:debug("Unregistered from poller");
return true;
end
@@ -334,7 +367,7 @@ function interface:onreadable()
local data, err, partial = self.conn:receive(self.read_size or cfg.read_size);
if data then
self:onconnect();
- self:on("incoming", data);
+ self:onincoming(data);
else
if err == "wantread" then
self:set(true, nil);
@@ -345,7 +378,7 @@ function interface:onreadable()
end
if partial and partial ~= "" then
self:onconnect();
- self:on("incoming", partial, err);
+ self:onincoming(partial, err);
end
if err ~= "timeout" then
self:on("disconnect", err);
@@ -354,6 +387,14 @@ function interface:onreadable()
end
end
if not self.conn then return; end
+ if self._limit and (data or partial) then
+ local cost = self._limit * #(data or partial);
+ if cost > cfg.min_wait then
+ self:setreadtimeout(false);
+ self:pausefor(cost);
+ return;
+ end
+ end
if self._wantread and self.conn:dirty() then
self:setreadtimeout(false);
self:pausefor(cfg.read_retry_delay);
@@ -378,10 +419,12 @@ function interface:onwritable()
self:ondrain(); -- Be aware of writes in ondrain
return;
elseif partial then
+ self:debug("Sent %d out of %d buffered bytes", partial, #data);
buffer[1] = data:sub(partial+1);
for i = #buffer, 2, -1 do
buffer[i] = nil;
end
+ self:set(nil, true);
self:setwritetimeout();
end
if err == "wantwrite" or err == "timeout" then
@@ -407,8 +450,14 @@ function interface:write(data)
else
self.writebuffer = { data };
end
- self:setwritetimeout();
- self:set(nil, true);
+ if not self._write_lock then
+ if cfg.opportunistic_writes then
+ self:onwritable();
+ return #data;
+ end
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
return #data;
end
interface.send = interface.write;
@@ -418,10 +467,10 @@ function interface:close()
if self.writebuffer and self.writebuffer[1] then
self:set(false, true); -- Flush final buffer contents
self.write, self.send = noop, noop; -- No more writing
- log("debug", "Close %s after writing", self);
+ self:debug("Close after writing remaining buffered data");
self.ondrain = interface.close;
else
- log("debug", "Close %s now", self);
+ self:debug("Closing now");
self.write, self.send = noop, noop;
self.close = noop;
self:on("disconnect");
@@ -450,7 +499,7 @@ function interface:starttls(tls_ctx)
if tls_ctx then self.tls_ctx = tls_ctx; end
self.starttls = false;
if self.writebuffer and self.writebuffer[1] then
- log("debug", "Start TLS on %s after write", self);
+ self:debug("Start TLS after write");
self.ondrain = interface.starttls;
self:set(nil, true); -- make sure wantwrite is set
else
@@ -460,7 +509,7 @@ function interface:starttls(tls_ctx)
self.onwritable = interface.tlshandskake;
self.onreadable = interface.tlshandskake;
self:set(true, true);
- log("debug", "Prepare to start TLS on %s", self);
+ self:debug("Prepared to start TLS");
end
end
@@ -469,12 +518,13 @@ function interface:tlshandskake()
self:setreadtimeout(false);
if not self._tls then
self._tls = true;
- log("debug", "Start TLS on %s now", self);
+ self:debug("Starting TLS now");
self:del();
+ self:updatenames(); -- Can't getpeer/sockname after wrap()
local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
if not ok then
conn, err = ok, conn;
- log("error", "Failed to initialize TLS: %s", err);
+ self:debug("Failed to initialize TLS: %s", err);
end
if not conn then
self:on("disconnect", err);
@@ -483,6 +533,13 @@ function interface:tlshandskake()
end
conn:settimeout(0);
self.conn = conn;
+ if conn.sni then
+ if self.servername then
+ conn:sni(self.servername);
+ elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
+ conn:sni(self._server.hosts, true);
+ end
+ end
self:on("starttls");
self.ondrain = nil;
self.onwritable = interface.tlshandskake;
@@ -491,40 +548,55 @@ function interface:tlshandskake()
end
local ok, err = self.conn:dohandshake();
if ok then
- log("debug", "TLS handshake on %s complete", self);
+ local info = self.conn.info and self.conn:info();
+ if type(info) == "table" then
+ self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher);
+ else
+ self:debug("TLS handshake complete");
+ end
self.onwritable = nil;
self.onreadable = nil;
self:on("status", "ssl-handshake-complete");
self:setwritetimeout();
self:set(true, true);
elseif err == "wantread" then
- log("debug", "TLS handshake on %s to wait until readable", self);
+ self:debug("TLS handshake to wait until readable");
self:set(true, false);
self:setreadtimeout(cfg.ssl_handshake_timeout);
elseif err == "wantwrite" then
- log("debug", "TLS handshake on %s to wait until writable", self);
+ self:debug("TLS handshake to wait until writable");
self:set(false, true);
self:setwritetimeout(cfg.ssl_handshake_timeout);
else
- log("debug", "TLS handshake error on %s: %s", self, err);
+ self:debug("TLS handshake error: %s", err);
self:on("disconnect", err);
self:destroy();
end
end
-local function wrapsocket(client, server, read_size, listeners, tls_ctx) -- luasocket object -> interface object
+local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object
client:settimeout(0);
+ local conn_id = ("conn%s"):format(new_id());
local conn = setmetatable({
conn = client;
_server = server;
- created = gettime();
+ created = realtime();
listeners = listeners;
read_size = read_size or (server and server.read_size);
writebuffer = {};
tls_ctx = tls_ctx or (server and server.tls_ctx);
tls_direct = server and server.tls_direct;
+ id = conn_id;
+ log = logger.init(conn_id);
+ extra = extra;
}, interface_mt);
+ if extra then
+ if extra.servername then
+ conn.servername = extra.servername;
+ end
+ end
+
conn:updatenames();
return conn;
end
@@ -532,11 +604,11 @@ end
function interface:updatenames()
local conn = self.conn;
local ok, peername, peerport = pcall(conn.getpeername, conn);
- if ok then
+ if ok and peername then
self.peername, self.peerport = peername, peerport;
end
local ok, sockname, sockport = pcall(conn.getsockname, conn);
- if ok then
+ if ok and sockname then
self.sockname, self.sockport = sockname, sockport;
end
end
@@ -546,34 +618,39 @@ end
function interface:onacceptable()
local conn, err = self.conn:accept();
if not conn then
- log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
+ self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
self:pausefor(cfg.accept_retry_interval);
return;
end
local client = wrapsocket(conn, self, nil, self.listeners);
- log("debug", "New connection %s", tostring(client));
+ client:debug("New connection %s on server %s", client, self);
client:init();
if self.tls_direct then
client:starttls(self.tls_ctx);
+ else
+ client:onconnect();
end
end
-- Initialization
function interface:init()
- self:setwritetimeout();
+ self:setwritetimeout(cfg.connect_timeout);
return self:add(true, true);
end
function interface:pause()
+ self:debug("Pause reading");
return self:set(false);
end
function interface:resume()
+ self:debug("Resume reading");
return self:set(true);
end
-- Pause connection for some time
function interface:pausefor(t)
+ self:debug("Pause for %fs", t);
if self._pausefor then
self._pausefor:close();
end
@@ -582,43 +659,85 @@ function interface:pausefor(t)
self._pausefor = addtimer(t, function ()
self._pausefor = nil;
self:set(true);
+ self:debug("Resuming after pause, connection is %s", not self.conn and "missing" or self.conn:dirty() and "dirty" or "clean");
if self.conn and self.conn:dirty() then
self:onreadable();
end
end);
end
+function interface:setlimit(Bps)
+ if Bps > 0 then
+ self._limit = 1/Bps;
+ else
+ self._limit = nil;
+ end
+end
+
+function interface:pause_writes()
+ if self._write_lock then
+ return
+ end
+ self:debug("Pause writes");
+ self._write_lock = true;
+ self:setwritetimeout(false);
+ self:set(nil, false);
+end
+
+function interface:resume_writes()
+ if not self._write_lock then
+ return
+ end
+ self:debug("Resume writes");
+ self._write_lock = nil;
+ if self.writebuffer[1] then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
+end
+
-- Connected!
function interface:onconnect()
- if self.conn and not self.peername and self.conn.getpeername then
- self.peername, self.peerport = self.conn:getpeername();
- end
+ self:updatenames();
+ self:debug("Connected (%s)", self);
self.onconnect = noop;
self:on("connect");
end
-local function addserver(addr, port, listeners, read_size, tls_ctx)
+local function listen(addr, port, listeners, config)
local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
if not conn then return conn, err; end
conn:settimeout(0);
local server = setmetatable({
conn = conn;
- created = gettime();
+ created = realtime();
listeners = listeners;
- read_size = read_size;
+ read_size = config and config.read_size;
onreadable = interface.onacceptable;
- tls_ctx = tls_ctx;
- tls_direct = tls_ctx and true or false;
+ tls_ctx = config and config.tls_ctx;
+ tls_direct = config and config.tls_direct;
+ hosts = config and config.sni_hosts;
sockname = addr;
sockport = port;
+ log = logger.init(("serv%s"):format(new_id()));
}, interface_mt);
+ server:debug("Server %s created", server);
server:add(true, false);
return server;
end
-- COMPAT
-local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx)
- local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx);
+local function addserver(addr, port, listeners, read_size, tls_ctx)
+ return listen(addr, port, listeners, {
+ read_size = read_size;
+ tls_ctx = tls_ctx;
+ tls_direct = tls_ctx and true or false;
+ });
+end
+
+-- COMPAT
+local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra)
+ local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra);
if not client.peername then
client.peername, client.peerport = addr, port;
end
@@ -631,7 +750,7 @@ local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx)
end
-- New outgoing TCP connection
-local function addclient(addr, port, listeners, read_size, tls_ctx, typ)
+local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra)
local create;
if not typ then
local n = inet_pton(addr);
@@ -649,13 +768,19 @@ local function addclient(addr, port, listeners, read_size, tls_ctx, typ)
return nil, "invalid socket type";
end
local conn, err = create();
+ if not conn then return conn, err; end
local ok, err = conn:settimeout(0);
if not ok then return ok, err; end
local ok, err = conn:setpeername(addr, port);
if not ok and err ~= "timeout" then return ok, err; end
- local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx)
+ local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra)
local ok, err = client:init();
+ if not client.peername then
+ -- otherwise not set until connected
+ client.peername, client.peerport = addr, port;
+ end
if not ok then return ok, err; end
+ client:debug("Client %s created", client);
if tls_ctx then
client:starttls(tls_ctx);
end
@@ -677,23 +802,23 @@ local function watchfd(fd, onreadable, onwritable)
end;
-- Otherwise it'll need to be something LuaSocket-compatible
end
+ conn.id = new_id();
+ conn.log = logger.init(("fdwatch%s"):format(conn.id));
conn:add(onreadable, onwritable);
return conn;
end;
-- Dump all data from one connection into another
-local function link(from, to)
- from.listeners = setmetatable({
- onincoming = function (_, data)
- from:pause();
- to:write(data);
- end,
- }, {__index=from.listeners});
- to.listeners = setmetatable({
- ondrain = function ()
- from:resume();
- end,
- }, {__index=to.listeners});
+local function link(from, to, read_size)
+ from:debug("Linking to %s", to.id);
+ function from:onincoming(data)
+ self:pause();
+ to:write(data);
+ end
+ function to:ondrain() -- luacheck: ignore 212/self
+ from:resume();
+ end
+ from:set_mode(read_size);
from:set(true, nil);
to:set(nil, true);
end
@@ -752,7 +877,7 @@ return {
addserver = addserver;
addclient = addclient;
add_task = addtimer;
- at = at;
+ listen = listen;
loop = loop;
closeall = closeall;
setquitting = setquitting;
@@ -766,6 +891,7 @@ return {
-- libevent emulation
event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 };
addevent = function (fd, mode, callback)
+ log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead");
local function onevent(self)
local ret = self:callback();
if ret == -1 then
@@ -785,6 +911,8 @@ return {
fds[fd] = nil;
end;
}, interface_mt);
+ conn.id = conn:getfd();
+ conn.log = logger.init(("fdwatch%d"):format(conn.id));
local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw");
if not ok then return ok, err; end
return conn;