aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_epoll.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r--net/server_epoll.lua568
1 files changed, 421 insertions, 147 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index c47e1a70..90d3929f 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -9,20 +9,25 @@
local t_insert = table.insert;
local t_concat = table.concat;
local setmetatable = setmetatable;
-local tostring = tostring;
local pcall = pcall;
local type = type;
local next = next;
local pairs = pairs;
-local log = require "util.logger".init("server_epoll");
+local ipairs = ipairs;
+local traceback = debug.traceback;
+local logger = require "util.logger";
+local log = logger.init("server_epoll");
local socket = require "socket";
local luasec = require "ssl";
-local gettime = require "util.time".now;
+local realtime = require "util.time".now;
+local monotonic = require "util.time".monotonic;
local indexedbheap = require "util.indexedbheap";
local createtable = require "util.table".create;
local inet = require "util.net";
local inet_pton = inet.pton;
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
+local new_id = require "util.id".medium;
+local xpcall = require "util.xpcall".xpcall;
local poller = require "util.poll"
local EEXIST = poller.EEXIST;
@@ -38,7 +43,10 @@ local default_config = { __index = {
read_timeout = 14 * 60;
-- How long to wait for a socket to become writable after queuing data to send
- send_timeout = 60;
+ send_timeout = 180;
+
+ -- How long to wait for a socket to become writable after creation
+ connect_timeout = 20;
-- Some number possibly influencing how many pending connections can be accepted
tcp_backlog = 128;
@@ -46,7 +54,7 @@ local default_config = { __index = {
-- If accepting a new incoming connection fails, wait this long before trying again
accept_retry_interval = 10;
- -- If there is still more data to read from LuaSocktes buffer, wait this long and read again
+ -- If there is still more data to read from LuaSockets buffer, wait this long and read again
read_retry_delay = 1e-06;
-- Size of chunks to read from sockets
@@ -57,7 +65,30 @@ local default_config = { __index = {
-- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers)
max_wait = 86400;
- min_wait = 1e-06;
+ min_wait = 0.001;
+
+ -- Enable extra noisy debug logging
+ -- TODO disable once considered stable
+ verbose = true;
+
+ -- EXPERIMENTAL
+ -- Whether to kill connections in case of callback errors.
+ fatal_errors = false;
+
+ -- Or disable protection (like server_select) for potential performance gains
+ protect_listeners = true;
+
+ -- Attempt writes instantly
+ opportunistic_writes = false;
+
+ -- TCP Keepalives
+ tcp_keepalive = false; -- boolean | number
+
+ -- Whether to let the Nagle algorithm stay enabled
+ nagle = true;
+
+ -- Reuse write buffer tables
+ keep_buffers = true;
--- How long to wait after getting the shutdown signal before forcefully tearing down every socket
shutdown_deadline = 5;
@@ -71,54 +102,62 @@ local fds = createtable(10, 0); -- FD -> conn
local timers = indexedbheap.create();
local function noop() end
-local function closetimer(t)
- t[1] = 0;
- t[2] = noop;
- timers:remove(t.id);
-end
-local function reschedule(t, time)
- t[1] = time;
- timers:reprioritize(t.id, time);
+-- Keep track of recently closed timers to avoid re-adding them
+local closedtimers = {};
+
+local function closetimer(id)
+ if timers:remove(id) then
+ closedtimers[id] = true;
+ end
end
--- Add absolute timer
-local function at(time, f)
- local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
- timer.id = timers:insert(timer, time);
- return timer;
+local function reschedule(id, time)
+ time = monotonic() + time;
+ timers:reprioritize(id, time);
end
-- Add relative timer
-local function addtimer(timeout, f)
- return at(gettime() + timeout, f);
+local function addtimer(timeout, f, param)
+ local time = monotonic() + timeout;
+ if param ~= nil then
+ local timer_callback = f
+ function f(current_time, timer_id)
+ local t = timer_callback(current_time, timer_id, param)
+ return t;
+ end
+ end
+ local id = timers:insert(f, time);
+ return id;
end
-- Run callbacks of expired timers
-- Return time until next timeout
local function runtimers(next_delay, min_wait)
-- Any timers at all?
- local now = gettime();
+ local elapsed = monotonic();
+ local now = realtime();
local peek = timers:peek();
local readd;
while peek do
- if peek > now then
+ if peek > elapsed then
break;
end
local _, timer, id = timers:pop();
- local ok, ret = pcall(timer[2], now);
- if ok and type(ret) == "number" then
- local next_time = now+ret;
- timer[1] = next_time;
+ local ok, ret = xpcall(timer, traceback, now, id);
+ if ok and type(ret) == "number" and not closedtimers[id] then
+ local next_time = elapsed+ret;
-- Delay insertion of timers to be re-added
-- so they don't get called again this tick
if readd then
- readd[id] = timer;
+ readd[id] = { timer, next_time };
else
- readd = { [id] = timer };
+ readd = { [id] = { timer, next_time } };
end
+ elseif not ok then
+ log("error", "Error in timer: %s", ret);
end
peek = timers:peek();
@@ -126,15 +165,19 @@ local function runtimers(next_delay, min_wait)
if readd then
for id, timer in pairs(readd) do
- timers:insert(timer, timer[1], id);
+ timers:insert(timer[1], timer[2], id);
end
peek = timers:peek();
end
+ if next(closedtimers) ~= nil then
+ closedtimers = {};
+ end
+
if peek == nil then
return next_delay;
else
- next_delay = peek - now;
+ next_delay = peek - elapsed;
end
if next_delay < min_wait then
@@ -157,6 +200,22 @@ function interface_mt:__tostring()
return ("FD %d"):format(self:getfd());
end
+interface.log = log;
+function interface:debug(msg, ...)
+ self.log("debug", msg, ...);
+end
+
+interface.noise = interface.debug;
+function interface:noise(msg, ...)
+ if cfg.verbose then
+ return self:debug(msg, ...);
+ end
+end
+
+function interface:error(msg, ...)
+ self.log("error", msg, ...);
+end
+
-- Replace the listener and tell the old one
function interface:setlistener(listeners, data)
self:on("detach");
@@ -167,21 +226,36 @@ end
-- Call a listener callback
function interface:on(what, ...)
if not self.listeners then
- log("error", "%s has no listeners", self);
+ self:error("Interface is missing listener callbacks");
return;
end
local listener = self.listeners["on"..what];
if not listener then
- -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
+ self:noise("Missing listener 'on%s'", what); -- uncomment for development and debugging
return;
end
- local ok, err = pcall(listener, self, ...);
+ if not cfg.protect_listeners then
+ return listener(self, ...);
+ end
+ local onerror = self.listeners.onerror or traceback;
+ local ok, err = xpcall(listener, onerror, self, ...);
if not ok then
- log("error", "Error calling on%s: %s", what, err);
+ if cfg.fatal_errors then
+ self:error("Closing due to error calling on%s: %s", what, err);
+ self:destroy();
+ else
+ self:error("Error calling on%s: %s", what, err);
+ end
+ return nil, err;
end
return err;
end
+-- Allow this one to be overridden
+function interface:onincoming(...)
+ return self:on("incoming", ...);
+end
+
-- Return the file descriptor number
function interface:getfd()
if self.conn then
@@ -201,20 +275,24 @@ end
-- Get a port number, doesn't matter which
function interface:port()
- return self.sockport or self.peerport;
+ return self.peerport or self.sockport;
end
--- Get local port number
+-- Client-side port (usually a random high port)
function interface:clientport()
- return self.sockport;
+ if self._server then
+ return self.peerport;
+ else
+ return self.sockport;
+ end
end
--- Get remote port
+-- Get port on the server
function interface:serverport()
- if self.sockport then
+ if self._server then
return self.sockport;
- elseif self._server then
- self._server:port();
+ else
+ return self.peerport;
end
end
@@ -229,28 +307,36 @@ end
function interface:setoption(k, v)
-- LuaSec doesn't expose setoption :(
- if self.conn.setoption then
- self.conn:setoption(k, v);
+ local ok, ret, err = pcall(self.conn.setoption, self.conn, k, v);
+ if not ok then
+ self:noise("Setting option %q = %q failed: %s", k, v, ret);
+ return ok, ret;
+ elseif not ret then
+ self:noise("Setting option %q = %q failed: %s", k, v, err);
+ return ret, err;
end
+ return ret;
end
-- Timeout for detecting dead or idle sockets
function interface:setreadtimeout(t)
if t == false then
if self._readtimeout then
- self._readtimeout:close();
+ closetimer(self._readtimeout);
self._readtimeout = nil;
end
return
end
t = t or cfg.read_timeout;
if self._readtimeout then
- self._readtimeout:reschedule(gettime() + t);
+ reschedule(self._readtimeout, t);
else
self._readtimeout = addtimer(t, function ()
if self:on("readtimeout") then
+ self:noise("Read timeout handled");
return cfg.read_timeout;
else
+ self:debug("Read timeout not handled, disconnecting");
self:on("disconnect", "read timeout");
self:destroy();
end
@@ -262,17 +348,18 @@ end
function interface:setwritetimeout(t)
if t == false then
if self._writetimeout then
- self._writetimeout:close();
+ closetimer(self._writetimeout);
self._writetimeout = nil;
end
return
end
t = t or cfg.send_timeout;
if self._writetimeout then
- self._writetimeout:reschedule(gettime() + t);
+ reschedule(self._writetimeout, t);
else
self._writetimeout = addtimer(t, function ()
- self:on("disconnect", "write timeout");
+ self:noise("Write timeout");
+ self:on("disconnect", self._connected and "write timeout" or "connection timeout");
self:destroy();
end);
end
@@ -288,15 +375,15 @@ function interface:add(r, w)
local ok, err, errno = poll:add(fd, r, w);
if not ok then
if errno == EEXIST then
- log("debug", "%s already registered!", self);
+ self:debug("FD already registered in poller! (EEXIST)");
return self:set(r, w); -- So try to change its flags
end
- log("error", "Could not register %s: %s(%d)", self, err, errno);
+ self:debug("Could not register in poller: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
fds[fd] = self;
- log("debug", "Watching %s", self);
+ self:noise("Registered in poller");
return true;
end
@@ -309,7 +396,7 @@ function interface:set(r, w)
if w == nil then w = self._wantwrite; end
local ok, err, errno = poll:set(fd, r, w);
if not ok then
- log("error", "Could not update poller state %s: %s(%d)", self, err, errno);
+ self:debug("Could not update poller state: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = r, w;
@@ -326,12 +413,12 @@ function interface:del()
end
local ok, err, errno = poll:del(fd);
if not ok and errno ~= ENOENT then
- log("error", "Could not unregister %s: %s(%d)", self, err, errno);
+ self:debug("Could not unregister: %s(%d)", err, errno);
return ok, err;
end
self._wantread, self._wantwrite = nil, nil;
fds[fd] = nil;
- log("debug", "Unwatched %s", self);
+ self:noise("Unregistered from poller");
return true;
end
@@ -353,27 +440,44 @@ function interface:onreadable()
local data, err, partial = self.conn:receive(self.read_size or cfg.read_size);
if data then
self:onconnect();
- self:on("incoming", data);
+ self:onincoming(data);
else
if err == "wantread" then
self:set(true, nil);
err = "timeout";
elseif err == "wantwrite" then
self:set(nil, true);
+ self:setwritetimeout();
err = "timeout";
+ elseif err == "timeout" and not self._connected then
+ err = "connection timeout";
end
if partial and partial ~= "" then
self:onconnect();
- self:on("incoming", partial, err);
+ self:onincoming(partial, err);
end
if err ~= "timeout" then
+ if err == "closed" then
+ self:debug("Connection closed by remote");
+ else
+ self:debug("Read error, closing (%s)", err);
+ end
self:on("disconnect", err);
- self:destroy()
+ self:close();
return;
end
end
if not self.conn then return; end
- if self._wantread and self.conn:dirty() then
+ if self._limit and (data or partial) then
+ local cost = self._limit * #(data or partial);
+ if cost > cfg.min_wait then
+ self:setreadtimeout(false);
+ self:pausefor(cost);
+ return;
+ end
+ end
+ if not self._wantread then return end
+ if self.conn:dirty() then
self:setreadtimeout(false);
self:pausefor(cfg.read_retry_delay);
else
@@ -385,31 +489,55 @@ end
function interface:onwritable()
self:onconnect();
if not self.conn then return; end -- could have been closed in onconnect
+ self:on("predrain");
local buffer = self.writebuffer;
- local data = t_concat(buffer);
+ local data = buffer or "";
+ if type(buffer) == "table" then
+ if buffer[3] then
+ data = t_concat(data);
+ elseif buffer[2] then
+ data = buffer[1] .. buffer[2];
+ else
+ data = buffer[1] or "";
+ end
+ end
local ok, err, partial = self.conn:send(data);
+ self._writable = ok;
if ok then
self:set(nil, false);
- for i = #buffer, 1, -1 do
- buffer[i] = nil;
+ if cfg.keep_buffers and type(buffer) == "table" then
+ for i = #buffer, 1, -1 do
+ buffer[i] = nil;
+ end
+ else
+ self.writebuffer = nil;
end
self:setwritetimeout(false);
self:ondrain(); -- Be aware of writes in ondrain
- return;
+ return ok;
elseif partial then
- buffer[1] = data:sub(partial+1);
- for i = #buffer, 2, -1 do
- buffer[i] = nil;
+ self:debug("Sent %d out of %d buffered bytes", partial, #data);
+ if cfg.keep_buffers and type(buffer) == "table" then
+ buffer[1] = data:sub(partial+1);
+ for i = #buffer, 2, -1 do
+ buffer[i] = nil;
+ end
+ else
+ self.writebuffer = data:sub(partial+1);
end
+ self:set(nil, true);
self:setwritetimeout();
end
if err == "wantwrite" or err == "timeout" then
self:set(nil, true);
+ self:setwritetimeout();
elseif err == "wantread" then
self:set(true, nil);
+ self:setreadtimeout();
elseif err ~= "timeout" then
self:on("disconnect", err);
self:destroy();
+ return ok, err;
end
end
@@ -421,26 +549,39 @@ end
-- Add data to write buffer and set flag for wanting to write
function interface:write(data)
local buffer = self.writebuffer;
- if buffer then
+ if type(buffer) == "table" then
t_insert(buffer, data);
- else
- self.writebuffer = { data };
+ elseif type(buffer) == "string" then
+ self:noise("Allocating buffer!")
+ self.writebuffer = { buffer, data };
+ elseif buffer == nil then
+ self.writebuffer = data;
+ end
+ if not self._write_lock then
+ if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then
+ self._opportunistic_write = true;
+ local ret, err = self:onwritable();
+ self._opportunistic_write = nil;
+ return ret, err;
+ end
+ self:setwritetimeout();
+ self:set(nil, true);
end
- self:setwritetimeout();
- self:set(nil, true);
return #data;
end
interface.send = interface.write;
-- Close, possibly after writing is done
function interface:close()
- if self.writebuffer and self.writebuffer[1] then
+ if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
self:set(false, true); -- Flush final buffer contents
+ self:setreadtimeout(false);
+ self:setwritetimeout();
self.write, self.send = noop, noop; -- No more writing
- log("debug", "Close %s after writing", self);
+ self:debug("Close after writing remaining buffered data");
self.ondrain = interface.close;
else
- log("debug", "Close %s now", self);
+ self:debug("Closing now");
self.write, self.send = noop, noop;
self.close = noop;
self:on("disconnect");
@@ -465,70 +606,108 @@ function interface:ssl()
return self._tls;
end
+function interface:set_sslctx(sslctx)
+ self._sslctx = sslctx;
+end
+
function interface:starttls(tls_ctx)
if tls_ctx then self.tls_ctx = tls_ctx; end
self.starttls = false;
- if self.writebuffer and self.writebuffer[1] then
- log("debug", "Start TLS on %s after write", self);
+ if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ self:debug("Start TLS after write");
self.ondrain = interface.starttls;
self:set(nil, true); -- make sure wantwrite is set
else
if self.ondrain == interface.starttls then
self.ondrain = nil;
end
- self.onwritable = interface.tlshandskake;
- self.onreadable = interface.tlshandskake;
+ self.onwritable = interface.inittls;
+ self.onreadable = interface.inittls;
self:set(true, true);
- log("debug", "Prepare to start TLS on %s", self);
+ self:setreadtimeout(false);
+ self:setwritetimeout(cfg.ssl_handshake_timeout);
+ self:debug("Prepared to start TLS");
end
end
-function interface:tlshandskake()
- self:setwritetimeout(false);
- self:setreadtimeout(false);
- if not self._tls then
- self._tls = true;
- log("debug", "Start TLS on %s now", self);
- self:del();
- local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
- if not ok then
- conn, err = ok, conn;
- log("error", "Failed to initialize TLS: %s", err);
- end
- if not conn then
- self:on("disconnect", err);
- self:destroy();
- return conn, err;
- end
- conn:settimeout(0);
- self.conn = conn;
- if conn.sni and self.servername then
+function interface:inittls(tls_ctx, now)
+ if self._tls then return end
+ if tls_ctx then self.tls_ctx = tls_ctx; end
+ self._tls = true;
+ self:debug("Starting TLS now");
+ self:updatenames(); -- Can't getpeer/sockname after wrap()
+ local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
+ if not ok then
+ conn, err = ok, conn;
+ self:debug("Failed to initialize TLS: %s", err);
+ end
+ if not conn then
+ self:on("disconnect", err);
+ self:destroy();
+ return conn, err;
+ end
+ conn:settimeout(0);
+ self.conn = conn;
+ if conn.sni then
+ if self.servername then
conn:sni(self.servername);
+ elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
+ conn:sni(self._server.hosts, true);
end
- self:on("starttls");
- self.ondrain = nil;
- self.onwritable = interface.tlshandskake;
- self.onreadable = interface.tlshandskake;
- return self:init();
end
+ if self.extra and self.extra.tlsa and conn.settlsa then
+ -- TODO Error handling
+ if not conn:setdane(self.servername or self.extra.dane_hostname) then
+ self:debug("Could not enable DANE on connection");
+ else
+ self:debug("Enabling DANE with %d TLSA records", #self.extra.tlsa);
+ self:noise("DANE hostname is %q", self.servername or self.extra.dane_hostname);
+ for _, tlsa in ipairs(self.extra.tlsa) do
+ self:noise("TLSA: %q", tlsa);
+ conn:settlsa(tlsa.use, tlsa.select, tlsa.match, tlsa.data);
+ end
+ end
+ end
+ self:on("starttls");
+ self.ondrain = nil;
+ self.onwritable = interface.tlshandshake;
+ self.onreadable = interface.tlshandshake;
+ if now then
+ return self:tlshandshake()
+ end
+ self:setreadtimeout(false);
+ self:setwritetimeout(cfg.ssl_handshake_timeout);
+ self:set(true, true);
+end
+
+function interface:tlshandshake()
+ self:setreadtimeout(false);
+ self:noise("Continuing TLS handshake");
local ok, err = self.conn:dohandshake();
if ok then
- log("debug", "TLS handshake on %s complete", self);
+ local info = self.conn.info and self.conn:info();
+ if type(info) == "table" then
+ self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher);
+ else
+ self:debug("TLS handshake complete");
+ end
+ self:setwritetimeout(false);
self.onwritable = nil;
self.onreadable = nil;
self:on("status", "ssl-handshake-complete");
- self:setwritetimeout();
self:set(true, true);
+ self:onconnect();
+ self:onreadable();
elseif err == "wantread" then
- log("debug", "TLS handshake on %s to wait until readable", self);
+ self:noise("TLS handshake to wait until readable");
self:set(true, false);
- self:setreadtimeout(cfg.ssl_handshake_timeout);
+ self:setwritetimeout(cfg.ssl_handshake_timeout);
elseif err == "wantwrite" then
- log("debug", "TLS handshake on %s to wait until writable", self);
+ self:noise("TLS handshake to wait until writable");
self:set(false, true);
self:setwritetimeout(cfg.ssl_handshake_timeout);
else
- log("debug", "TLS handshake error on %s: %s", self, err);
+ self:debug("TLS handshake error: %s", err);
self:on("disconnect", err);
self:destroy();
end
@@ -536,15 +715,18 @@ end
local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object
client:settimeout(0);
+ local conn_id = ("conn%s"):format(new_id());
local conn = setmetatable({
conn = client;
_server = server;
- created = gettime();
+ created = realtime();
listeners = listeners;
read_size = read_size or (server and server.read_size);
- writebuffer = {};
+ writebuffer = nil;
tls_ctx = tls_ctx or (server and server.tls_ctx);
tls_direct = server and server.tls_direct;
+ id = conn_id;
+ log = logger.init(conn_id);
extra = extra;
}, interface_mt);
@@ -561,12 +743,12 @@ end
function interface:updatenames()
local conn = self.conn;
local ok, peername, peerport = pcall(conn.getpeername, conn);
- if ok then
- self.peername, self.peerport = peername, peerport;
+ if ok and peername then
+ self.peername, self.peerport = peername, peerport or 0;
end
local ok, sockname, sockport = pcall(conn.getsockname, conn);
- if ok then
- self.sockname, self.sockport = sockname, sockport;
+ if ok and sockname then
+ self.sockname, self.sockport = sockname, sockport or 0;
end
end
@@ -575,76 +757,149 @@ end
function interface:onacceptable()
local conn, err = self.conn:accept();
if not conn then
- log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
+ self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
self:pausefor(cfg.accept_retry_interval);
return;
end
local client = wrapsocket(conn, self, nil, self.listeners);
- log("debug", "New connection %s", tostring(client));
- client:init();
+ client:debug("New connection %s on server %s", client, self);
+ client:defaultoptions();
+ client._writable = cfg.opportunistic_writes;
if self.tls_direct then
- client:starttls(self.tls_ctx);
+ client:add(true, true);
+ client:inittls(self.tls_ctx, true);
+ else
+ client:add(true, false);
+ client:onconnect();
+ client:onreadable();
end
end
--- Initialization
+-- Initialization for outgoing connections
function interface:init()
- self:setwritetimeout();
+ self:setwritetimeout(cfg.connect_timeout);
+ self:defaultoptions();
return self:add(true, true);
end
+function interface:defaultoptions()
+ if cfg.nagle == false then
+ self:setoption("tcp-nodelay", true);
+ end
+ if cfg.tcp_keepalive then
+ self:setoption("keepalive", true);
+ if type(cfg.tcp_keepalive) == "number" then
+ self:setoption("tcp-keepidle", cfg.tcp_keepalive);
+ end
+ end
+end
+
function interface:pause()
+ self:noise("Pause reading");
+ self:setreadtimeout(false);
return self:set(false);
end
function interface:resume()
+ self:noise("Resume reading");
+ self:setreadtimeout();
return self:set(true);
end
-- Pause connection for some time
function interface:pausefor(t)
+ self:noise("Pause for %fs", t);
if self._pausefor then
- self._pausefor:close();
+ closetimer(self._pausefor);
+ self._pausefor = nil;
end
if t == false then return; end
self:set(false);
self._pausefor = addtimer(t, function ()
self._pausefor = nil;
self:set(true);
+ self:noise("Resuming after pause");
if self.conn and self.conn:dirty() then
+ self:noise("Have buffered incoming data to process");
self:onreadable();
end
end);
end
+function interface:setlimit(Bps)
+ if Bps > 0 then
+ self._limit = 1/Bps;
+ else
+ self._limit = nil;
+ end
+end
+
+function interface:pause_writes()
+ if self._write_lock then
+ return
+ end
+ self:noise("Pause writes");
+ self._write_lock = true;
+ self:setwritetimeout(false);
+ self:set(nil, false);
+end
+
+function interface:resume_writes()
+ if not self._write_lock then
+ return
+ end
+ self:noise("Resume writes");
+ self._write_lock = nil;
+ if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
+end
+
-- Connected!
function interface:onconnect()
- if self.conn and not self.peername and self.conn.getpeername then
- self.peername, self.peerport = self.conn:getpeername();
- end
+ self._connected = true;
+ self:updatenames();
+ self:debug("Connected (%s)", self);
self.onconnect = noop;
self:on("connect");
end
-local function addserver(addr, port, listeners, read_size, tls_ctx)
- local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
- if not conn then return conn, err; end
- conn:settimeout(0);
+local function wrapserver(conn, addr, port, listeners, config)
local server = setmetatable({
conn = conn;
- created = gettime();
+ created = realtime();
listeners = listeners;
- read_size = read_size;
+ read_size = config and config.read_size;
onreadable = interface.onacceptable;
- tls_ctx = tls_ctx;
- tls_direct = tls_ctx and true or false;
+ tls_ctx = config and config.tls_ctx;
+ tls_direct = config and config.tls_direct;
+ hosts = config and config.sni_hosts;
sockname = addr;
sockport = port;
+ log = logger.init(("serv%s"):format(new_id()));
}, interface_mt);
+ server:debug("Server %s created", server);
server:add(true, false);
return server;
end
+local function listen(addr, port, listeners, config)
+ local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
+ if not conn then return conn, err; end
+ conn:settimeout(0);
+ return wrapserver(conn, addr, port, listeners, config);
+end
+
+-- COMPAT
+local function addserver(addr, port, listeners, read_size, tls_ctx)
+ return listen(addr, port, listeners, {
+ read_size = read_size;
+ tls_ctx = tls_ctx;
+ tls_direct = tls_ctx and true or false;
+ });
+end
+
-- COMPAT
local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra)
local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra);
@@ -678,13 +933,19 @@ local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra)
return nil, "invalid socket type";
end
local conn, err = create();
+ if not conn then return conn, err; end
local ok, err = conn:settimeout(0);
if not ok then return ok, err; end
local ok, err = conn:setpeername(addr, port);
if not ok and err ~= "timeout" then return ok, err; end
local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra)
local ok, err = client:init();
+ if not client.peername then
+ -- otherwise not set until connected
+ client.peername, client.peerport = addr, port;
+ end
if not ok then return ok, err; end
+ client:debug("Client %s created", client);
if tls_ctx then
client:starttls(tls_ctx);
end
@@ -706,23 +967,23 @@ local function watchfd(fd, onreadable, onwritable)
end;
-- Otherwise it'll need to be something LuaSocket-compatible
end
+ conn.id = new_id();
+ conn.log = logger.init(("fdwatch%s"):format(conn.id));
conn:add(onreadable, onwritable);
return conn;
end;
-- Dump all data from one connection into another
-local function link(from, to)
- from.listeners = setmetatable({
- onincoming = function (_, data)
- from:pause();
- to:write(data);
- end,
- }, {__index=from.listeners});
- to.listeners = setmetatable({
- ondrain = function ()
- from:resume();
- end,
- }, {__index=to.listeners});
+local function link(from, to, read_size)
+ from:debug("Linking to %s", to.id);
+ function from:onincoming(data)
+ self:pause();
+ to:write(data);
+ end
+ function to:ondrain() -- luacheck: ignore 212/self
+ from:resume();
+ end
+ from:set_mode(read_size);
from:set(true, nil);
to:set(nil, true);
end
@@ -796,11 +1057,21 @@ return {
addserver = addserver;
addclient = addclient;
add_task = addtimer;
- at = at;
+ timer = {
+ -- API-compatible with util.timer
+ add_task = addtimer;
+ stop = closetimer;
+ reschedule = reschedule;
+ to_absolute_time = function (t)
+ return t-monotonic()+realtime();
+ end;
+ };
+ listen = listen;
loop = loop;
closeall = closeall;
setquitting = setquitting;
wrapclient = wrapclient;
+ wrapserver = wrapserver;
watchfd = watchfd;
link = link;
set_config = function (newconfig)
@@ -810,6 +1081,7 @@ return {
-- libevent emulation
event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 };
addevent = function (fd, mode, callback)
+ log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead");
local function onevent(self)
local ret = self:callback();
if ret == -1 then
@@ -829,6 +1101,8 @@ return {
fds[fd] = nil;
end;
}, interface_mt);
+ conn.id = conn:getfd();
+ conn.log = logger.init(("fdwatch%d"):format(conn.id));
local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw");
if not ok then return ok, err; end
return conn;