aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_epoll.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r--net/server_epoll.lua376
1 files changed, 277 insertions, 99 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index 53a67dd5..34a11c03 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -9,20 +9,25 @@
local t_insert = table.insert;
local t_concat = table.concat;
local setmetatable = setmetatable;
-local tostring = tostring;
local pcall = pcall;
local type = type;
local next = next;
local pairs = pairs;
-local log = require "util.logger".init("server_epoll");
+local ipairs = ipairs;
+local traceback = debug.traceback;
+local logger = require "util.logger";
+local log = logger.init("server_epoll");
local socket = require "socket";
local luasec = require "ssl";
-local gettime = require "util.time".now;
+local realtime = require "util.time".now;
+local monotonic = require "util.time".monotonic;
local indexedbheap = require "util.indexedbheap";
local createtable = require "util.table".create;
local inet = require "util.net";
local inet_pton = inet.pton;
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
+local new_id = require "util.id".medium;
+local xpcall = require "util.xpcall".xpcall;
local poller = require "util.poll"
local EEXIST = poller.EEXIST;
@@ -38,7 +43,10 @@ local default_config = { __index = {
read_timeout = 14 * 60;
-- How long to wait for a socket to become writable after queuing data to send
- send_timeout = 60;
+ send_timeout = 180;
+
+ -- How long to wait for a socket to become writable after creation
+ connect_timeout = 20;
-- Some number possibly influencing how many pending connections can be accepted
tcp_backlog = 128;
@@ -58,6 +66,20 @@ local default_config = { __index = {
-- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers)
max_wait = 86400;
min_wait = 1e-06;
+
+ -- Enable extra noisy debug logging
+ -- TODO disable once considered stable
+ verbose = true;
+
+ -- EXPERIMENTAL
+ -- Whether to kill connections in case of callback errors.
+ fatal_errors = false;
+
+ -- Or disable protection (like server_select) for potential performance gains
+ protect_listeners = true;
+
+ -- Attempt writes instantly
+ opportunistic_writes = false;
}};
local cfg = default_config.__index;
@@ -68,54 +90,56 @@ local fds = createtable(10, 0); -- FD -> conn
local timers = indexedbheap.create();
local function noop() end
-local function closetimer(t)
- t[1] = 0;
- t[2] = noop;
- timers:remove(t.id);
+local function closetimer(id)
+ timers:remove(id);
end
-local function reschedule(t, time)
- t[1] = time;
- timers:reprioritize(t.id, time);
-end
-
--- Add absolute timer
-local function at(time, f)
- local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
- timer.id = timers:insert(timer, time);
- return timer;
+local function reschedule(id, time)
+ time = monotonic() + time;
+ timers:reprioritize(id, time);
end
-- Add relative timer
-local function addtimer(timeout, f)
- return at(gettime() + timeout, f);
+local function addtimer(timeout, f, param)
+ local time = monotonic() + timeout;
+ if param ~= nil then
+ local timer_callback = f
+ function f(current_time, timer_id)
+ local t = timer_callback(current_time, timer_id, param)
+ return t;
+ end
+ end
+ local id = timers:insert(f, time);
+ return id;
end
-- Run callbacks of expired timers
-- Return time until next timeout
local function runtimers(next_delay, min_wait)
-- Any timers at all?
- local now = gettime();
+ local elapsed = monotonic();
+ local now = realtime();
local peek = timers:peek();
local readd;
while peek do
- if peek > now then
+ if peek > elapsed then
break;
end
local _, timer, id = timers:pop();
- local ok, ret = pcall(timer[2], now);
+ local ok, ret = xpcall(timer, traceback, now, id);
if ok and type(ret) == "number" then
- local next_time = now+ret;
- timer[1] = next_time;
+ local next_time = elapsed+ret;
-- Delay insertion of timers to be re-added
-- so they don't get called again this tick
if readd then
- readd[id] = timer;
+ readd[id] = { timer, next_time };
else
- readd = { [id] = timer };
+ readd = { [id] = { timer, next_time } };
end
+ elseif not ok then
+ log("error", "Error in timer: %s", ret);
end
peek = timers:peek();
@@ -123,7 +147,7 @@ local function runtimers(next_delay, min_wait)
if readd then
for _, timer in pairs(readd) do
- timers:insert(timer, timer[1]);
+ timers:insert(timer[1], timer[2]);
end
peek = timers:peek();
end
@@ -131,7 +155,7 @@ local function runtimers(next_delay, min_wait)
if peek == nil then
return next_delay;
else
- next_delay = peek - now;
+ next_delay = peek - elapsed;
end
if next_delay < min_wait then
@@ -154,6 +178,22 @@ function interface_mt:__tostring()
return ("FD %d"):format(self:getfd());
end
+interface.log = log;
+function interface:debug(msg, ...)
+ self.log("debug", msg, ...);
+end
+
+interface.noise = interface.debug;
+function interface:noise(msg, ...)
+ if cfg.verbose then
+ return self:debug(msg, ...);
+ end
+end
+
+function interface:error(msg, ...)
+ self.log("error", msg, ...);
+end
+
-- Replace the listener and tell the old one
function interface:setlistener(listeners, data)
self:on("detach");
@@ -164,21 +204,36 @@ end
-- Call a listener callback
function interface:on(what, ...)
if not self.listeners then
- log("error", "%s has no listeners", self);
+ self:error("Interface is missing listener callbacks");
return;
end
local listener = self.listeners["on"..what];
if not listener then
- -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
+ self:noise("Missing listener 'on%s'", what); -- uncomment for development and debugging
return;
end
- local ok, err = pcall(listener, self, ...);
+ if not cfg.protect_listeners then
+ return listener(self, ...);
+ end
+ local onerror = self.listeners.onerror or traceback;
+ local ok, err = xpcall(listener, onerror, self, ...);
if not ok then
- log("error", "Error calling on%s: %s", what, err);
+ if cfg.fatal_errors then
+ self:error("Closing due to error calling on%s: %s", what, err);
+ self:destroy();
+ else
+ self:error("Error calling on%s: %s", what, err);
+ end
+ return nil, err;
end
return err;
end
+-- Allow this one to be overridden
+function interface:onincoming(...)
+ return self:on("incoming", ...);
+end
+
-- Return the file descriptor number
function interface:getfd()
if self.conn then
@@ -235,19 +290,21 @@ end
function interface:setreadtimeout(t)
if t == false then
if self._readtimeout then
- self._readtimeout:close();
+ closetimer(self._readtimeout);
self._readtimeout = nil;
end
return
end
t = t or cfg.read_timeout;
if self._readtimeout then
- self._readtimeout:reschedule(gettime() + t);
+ reschedule(self._readtimeout, t);
else
self._readtimeout = addtimer(t, function ()
if self:on("readtimeout") then
+ self:noise("Read timeout handled");
return cfg.read_timeout;
else
+ self:debug("Read timeout not handled, disconnecting");
self:on("disconnect", "read timeout");
self:destroy();
end
@@ -259,17 +316,18 @@ end
function interface:setwritetimeout(t)
if t == false then
if self._writetimeout then
- self._writetimeout:close();
+ closetimer(self._writetimeout);
self._writetimeout = nil;
end
return
end
t = t or cfg.send_timeout;
if self._writetimeout then
- self._writetimeout:reschedule(gettime() + t);
+ reschedule(self._writetimeout, t);
else
self._writetimeout = addtimer(t, function ()
- self:on("disconnect", "write timeout");
+ self:noise("Write timeout");
+ self:on("disconnect", self._connected and "write timeout" or "connection timeout");
self:destroy();
end);
end
@@ -285,15 +343,15 @@ function interface:add(r, w)
local ok, err, errno = poll:add(fd, r, w);
if not ok then
if errno == EEXIST then
- log("debug", "%s already registered!", self);
+ self:debug("FD already registered in poller! (EEXIST)");
return self:set(r, w); -- So try to change its flags
end
- log("error", "Could not register %s: %s(%d)", self, err, errno);
+ self:debug("Could not register in poller: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
fds[fd] = self;
- log("debug", "Watching %s", self);
+ self:noise("Registered in poller");
return true;
end
@@ -306,7 +364,7 @@ function interface:set(r, w)
if w == nil then w = self._wantwrite; end
local ok, err, errno = poll:set(fd, r, w);
if not ok then
- log("error", "Could not update poller state %s: %s(%d)", self, err, errno);
+ self:debug("Could not update poller state: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
@@ -323,12 +381,12 @@ function interface:del()
end
local ok, err, errno = poll:del(fd);
if not ok and errno ~= ENOENT then
- log("error", "Could not unregister %s: %s(%d)", self, err, errno);
+ self:debug("Could not unregister: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = nil, nil;
fds[fd] = nil;
- log("debug", "Unwatched %s", self);
+ self:noise("Unregistered from poller");
return true;
end
@@ -350,7 +408,7 @@ function interface:onreadable()
local data, err, partial = self.conn:receive(self.read_size or cfg.read_size);
if data then
self:onconnect();
- self:on("incoming", data);
+ self:onincoming(data);
else
if err == "wantread" then
self:set(true, nil);
@@ -361,15 +419,28 @@ function interface:onreadable()
end
if partial and partial ~= "" then
self:onconnect();
- self:on("incoming", partial, err);
+ self:onincoming(partial, err);
end
if err ~= "timeout" then
+ if err == "closed" then
+ self:debug("Connection closed by remote");
+ else
+ self:debug("Read error, closing (%s)", err);
+ end
self:on("disconnect", err);
self:destroy()
return;
end
end
if not self.conn then return; end
+ if self._limit and (data or partial) then
+ local cost = self._limit * #(data or partial);
+ if cost > cfg.min_wait then
+ self:setreadtimeout(false);
+ self:pausefor(cost);
+ return;
+ end
+ end
if self._wantread and self.conn:dirty() then
self:setreadtimeout(false);
self:pausefor(cfg.read_retry_delay);
@@ -383,7 +454,7 @@ function interface:onwritable()
self:onconnect();
if not self.conn then return; end -- could have been closed in onconnect
local buffer = self.writebuffer;
- local data = t_concat(buffer);
+ local data = #buffer == 1 and buffer[1] or t_concat(buffer);
local ok, err, partial = self.conn:send(data);
if ok then
self:set(nil, false);
@@ -394,10 +465,12 @@ function interface:onwritable()
self:ondrain(); -- Be aware of writes in ondrain
return;
elseif partial then
+ self:debug("Sent %d out of %d buffered bytes", partial, #data);
buffer[1] = data:sub(partial+1);
for i = #buffer, 2, -1 do
buffer[i] = nil;
end
+ self:set(nil, true);
self:setwritetimeout();
end
if err == "wantwrite" or err == "timeout" then
@@ -423,8 +496,14 @@ function interface:write(data)
else
self.writebuffer = { data };
end
- self:setwritetimeout();
- self:set(nil, true);
+ if not self._write_lock then
+ if cfg.opportunistic_writes then
+ self:onwritable();
+ return #data;
+ end
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
return #data;
end
interface.send = interface.write;
@@ -434,10 +513,10 @@ function interface:close()
if self.writebuffer and self.writebuffer[1] then
self:set(false, true); -- Flush final buffer contents
self.write, self.send = noop, noop; -- No more writing
- log("debug", "Close %s after writing", self);
+ self:debug("Close after writing remaining buffered data");
self.ondrain = interface.close;
else
- log("debug", "Close %s now", self);
+ self:debug("Closing now");
self.write, self.send = noop, noop;
self.close = noop;
self:on("disconnect");
@@ -466,31 +545,32 @@ function interface:starttls(tls_ctx)
if tls_ctx then self.tls_ctx = tls_ctx; end
self.starttls = false;
if self.writebuffer and self.writebuffer[1] then
- log("debug", "Start TLS on %s after write", self);
+ self:debug("Start TLS after write");
self.ondrain = interface.starttls;
self:set(nil, true); -- make sure wantwrite is set
else
if self.ondrain == interface.starttls then
self.ondrain = nil;
end
- self.onwritable = interface.tlshandskake;
- self.onreadable = interface.tlshandskake;
+ self.onwritable = interface.tlshandshake;
+ self.onreadable = interface.tlshandshake;
self:set(true, true);
- log("debug", "Prepare to start TLS on %s", self);
+ self:debug("Prepared to start TLS");
end
end
-function interface:tlshandskake()
+function interface:tlshandshake()
self:setwritetimeout(false);
self:setreadtimeout(false);
if not self._tls then
self._tls = true;
- log("debug", "Start TLS on %s now", self);
+ self:debug("Starting TLS now");
self:del();
+ self:updatenames(); -- Can't getpeer/sockname after wrap()
local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
if not ok then
conn, err = ok, conn;
- log("error", "Failed to initialize TLS: %s", err);
+ self:debug("Failed to initialize TLS: %s", err);
end
if not conn then
self:on("disconnect", err);
@@ -499,33 +579,56 @@ function interface:tlshandskake()
end
conn:settimeout(0);
self.conn = conn;
- if conn.sni and self.servername then
- conn:sni(self.servername);
+ if conn.sni then
+ if self.servername then
+ conn:sni(self.servername);
+ elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
+ conn:sni(self._server.hosts, true);
+ end
+ end
+ if self.extra and self.extra.tlsa and conn.settlsa then
+ -- TODO Error handling
+ if not conn:setdane(self.servername or self.extra.dane_hostname) then
+ self:debug("Could not enable DANE on connection");
+ else
+ self:debug("Enabling DANE with %d TLSA records", #self.extra.tlsa);
+ self:noise("DANE hostname is %q", self.servername or self.extra.dane_hostname);
+ for _, tlsa in ipairs(self.extra.tlsa) do
+ self:noise("TLSA: %q", tlsa);
+ conn:settlsa(tlsa.use, tlsa.select, tlsa.match, tlsa.data);
+ end
+ end
end
self:on("starttls");
self.ondrain = nil;
- self.onwritable = interface.tlshandskake;
- self.onreadable = interface.tlshandskake;
+ self.onwritable = interface.tlshandshake;
+ self.onreadable = interface.tlshandshake;
return self:init();
end
+ self:noise("Continuing TLS handshake");
local ok, err = self.conn:dohandshake();
if ok then
- log("debug", "TLS handshake on %s complete", self);
+ local info = self.conn.info and self.conn:info();
+ if type(info) == "table" then
+ self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher);
+ else
+ self:debug("TLS handshake complete");
+ end
self.onwritable = nil;
self.onreadable = nil;
self:on("status", "ssl-handshake-complete");
self:setwritetimeout();
self:set(true, true);
elseif err == "wantread" then
- log("debug", "TLS handshake on %s to wait until readable", self);
+ self:noise("TLS handshake to wait until readable");
self:set(true, false);
self:setreadtimeout(cfg.ssl_handshake_timeout);
elseif err == "wantwrite" then
- log("debug", "TLS handshake on %s to wait until writable", self);
+ self:noise("TLS handshake to wait until writable");
self:set(false, true);
self:setwritetimeout(cfg.ssl_handshake_timeout);
else
- log("debug", "TLS handshake error on %s: %s", self, err);
+ self:debug("TLS handshake error: %s", err);
self:on("disconnect", err);
self:destroy();
end
@@ -533,15 +636,18 @@ end
local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object
client:settimeout(0);
+ local conn_id = ("conn%s"):format(new_id());
local conn = setmetatable({
conn = client;
_server = server;
- created = gettime();
+ created = realtime();
listeners = listeners;
read_size = read_size or (server and server.read_size);
writebuffer = {};
tls_ctx = tls_ctx or (server and server.tls_ctx);
tls_direct = server and server.tls_direct;
+ id = conn_id;
+ log = logger.init(conn_id);
extra = extra;
}, interface_mt);
@@ -558,12 +664,12 @@ end
function interface:updatenames()
local conn = self.conn;
local ok, peername, peerport = pcall(conn.getpeername, conn);
- if ok then
- self.peername, self.peerport = peername, peerport;
+ if ok and peername then
+ self.peername, self.peerport = peername, peerport or 0;
end
local ok, sockname, sockport = pcall(conn.getsockname, conn);
- if ok then
- self.sockname, self.sockport = sockname, sockport;
+ if ok and sockname then
+ self.sockname, self.sockport = sockname, sockport or 0;
end
end
@@ -572,76 +678,129 @@ end
function interface:onacceptable()
local conn, err = self.conn:accept();
if not conn then
- log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
+ self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
self:pausefor(cfg.accept_retry_interval);
return;
end
local client = wrapsocket(conn, self, nil, self.listeners);
- log("debug", "New connection %s", tostring(client));
+ client:debug("New connection %s on server %s", client, self);
client:init();
if self.tls_direct then
client:starttls(self.tls_ctx);
+ else
+ client:onconnect();
end
end
-- Initialization
function interface:init()
- self:setwritetimeout();
+ self:setwritetimeout(cfg.connect_timeout);
return self:add(true, true);
end
function interface:pause()
+ self:noise("Pause reading");
return self:set(false);
end
function interface:resume()
+ self:noise("Resume reading");
return self:set(true);
end
-- Pause connection for some time
function interface:pausefor(t)
+ self:noise("Pause for %fs", t);
if self._pausefor then
- self._pausefor:close();
+ closetimer(self._pausefor);
+ self._pausefor = nil;
end
if t == false then return; end
self:set(false);
self._pausefor = addtimer(t, function ()
self._pausefor = nil;
self:set(true);
+ self:noise("Resuming after pause, connection is %s", not self.conn and "missing" or self.conn:dirty() and "dirty" or "clean");
if self.conn and self.conn:dirty() then
self:onreadable();
end
end);
end
+function interface:setlimit(Bps)
+ if Bps > 0 then
+ self._limit = 1/Bps;
+ else
+ self._limit = nil;
+ end
+end
+
+function interface:pause_writes()
+ if self._write_lock then
+ return
+ end
+ self:noise("Pause writes");
+ self._write_lock = true;
+ self:setwritetimeout(false);
+ self:set(nil, false);
+end
+
+function interface:resume_writes()
+ if not self._write_lock then
+ return
+ end
+ self:noise("Resume writes");
+ self._write_lock = nil;
+ if self.writebuffer[1] then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
+end
+
-- Connected!
function interface:onconnect()
- if self.conn and not self.peername and self.conn.getpeername then
- self.peername, self.peerport = self.conn:getpeername();
- end
+ self._connected = true;
+ self:updatenames();
+ self:debug("Connected (%s)", self);
self.onconnect = noop;
self:on("connect");
end
-local function addserver(addr, port, listeners, read_size, tls_ctx)
- local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
- if not conn then return conn, err; end
- conn:settimeout(0);
+local function wrapserver(conn, addr, port, listeners, config)
local server = setmetatable({
conn = conn;
- created = gettime();
+ created = realtime();
listeners = listeners;
- read_size = read_size;
+ read_size = config and config.read_size;
onreadable = interface.onacceptable;
- tls_ctx = tls_ctx;
- tls_direct = tls_ctx and true or false;
+ tls_ctx = config and config.tls_ctx;
+ tls_direct = config and config.tls_direct;
+ hosts = config and config.sni_hosts;
sockname = addr;
sockport = port;
+ log = logger.init(("serv%s"):format(new_id()));
}, interface_mt);
+ server:debug("Server %s created", server);
server:add(true, false);
return server;
end
+local function listen(addr, port, listeners, config)
+ local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
+ if not conn then return conn, err; end
+ conn:settimeout(0);
+ return wrapserver(conn, addr, port, listeners, config);
+end
+
+-- COMPAT
+local function addserver(addr, port, listeners, read_size, tls_ctx)
+ return listen(addr, port, listeners, {
+ read_size = read_size;
+ tls_ctx = tls_ctx;
+ tls_direct = tls_ctx and true or false;
+ });
+end
+
-- COMPAT
local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra)
local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra);
@@ -675,13 +834,19 @@ local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra)
return nil, "invalid socket type";
end
local conn, err = create();
+ if not conn then return conn, err; end
local ok, err = conn:settimeout(0);
if not ok then return ok, err; end
local ok, err = conn:setpeername(addr, port);
if not ok and err ~= "timeout" then return ok, err; end
local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra)
local ok, err = client:init();
+ if not client.peername then
+ -- otherwise not set until connected
+ client.peername, client.peerport = addr, port;
+ end
if not ok then return ok, err; end
+ client:debug("Client %s created", client);
if tls_ctx then
client:starttls(tls_ctx);
end
@@ -703,23 +868,23 @@ local function watchfd(fd, onreadable, onwritable)
end;
-- Otherwise it'll need to be something LuaSocket-compatible
end
+ conn.id = new_id();
+ conn.log = logger.init(("fdwatch%s"):format(conn.id));
conn:add(onreadable, onwritable);
return conn;
end;
-- Dump all data from one connection into another
-local function link(from, to)
- from.listeners = setmetatable({
- onincoming = function (_, data)
- from:pause();
- to:write(data);
- end,
- }, {__index=from.listeners});
- to.listeners = setmetatable({
- ondrain = function ()
- from:resume();
- end,
- }, {__index=to.listeners});
+local function link(from, to, read_size)
+ from:debug("Linking to %s", to.id);
+ function from:onincoming(data)
+ self:pause();
+ to:write(data);
+ end
+ function to:ondrain() -- luacheck: ignore 212/self
+ from:resume();
+ end
+ from:set_mode(read_size);
from:set(true, nil);
to:set(nil, true);
end
@@ -778,11 +943,21 @@ return {
addserver = addserver;
addclient = addclient;
add_task = addtimer;
- at = at;
+ timer = {
+ -- API-compatible with util.timer
+ add_task = addtimer;
+ stop = closetimer;
+ reschedule = reschedule;
+ to_absolute_time = function (t)
+ return t-monotonic()+realtime();
+ end;
+ };
+ listen = listen;
loop = loop;
closeall = closeall;
setquitting = setquitting;
wrapclient = wrapclient;
+ wrapserver = wrapserver;
watchfd = watchfd;
link = link;
set_config = function (newconfig)
@@ -792,6 +967,7 @@ return {
-- libevent emulation
event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 };
addevent = function (fd, mode, callback)
+ log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead");
local function onevent(self)
local ret = self:callback();
if ret == -1 then
@@ -811,6 +987,8 @@ return {
fds[fd] = nil;
end;
}, interface_mt);
+ conn.id = conn:getfd();
+ conn.log = logger.init(("fdwatch%d"):format(conn.id));
local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw");
if not ok then return ok, err; end
return conn;