diff options
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r-- | net/server_epoll.lua | 570 |
1 files changed, 422 insertions, 148 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua index c47e1a70..89b6ffe9 100644 --- a/net/server_epoll.lua +++ b/net/server_epoll.lua @@ -9,20 +9,25 @@ local t_insert = table.insert; local t_concat = table.concat; local setmetatable = setmetatable; -local tostring = tostring; local pcall = pcall; local type = type; local next = next; local pairs = pairs; -local log = require "util.logger".init("server_epoll"); +local ipairs = ipairs; +local traceback = debug.traceback; +local logger = require "util.logger"; +local log = logger.init("server_epoll"); local socket = require "socket"; local luasec = require "ssl"; -local gettime = require "util.time".now; +local realtime = require "util.time".now; +local monotonic = require "util.time".monotonic; local indexedbheap = require "util.indexedbheap"; local createtable = require "util.table".create; local inet = require "util.net"; local inet_pton = inet.pton; local _SOCKETINVALID = socket._SOCKETINVALID or -1; +local new_id = require "util.id".medium; +local xpcall = require "util.xpcall".xpcall; local poller = require "util.poll" local EEXIST = poller.EEXIST; @@ -38,7 +43,10 @@ local default_config = { __index = { read_timeout = 14 * 60; -- How long to wait for a socket to become writable after queuing data to send - send_timeout = 60; + send_timeout = 180; + + -- How long to wait for a socket to become writable after creation + connect_timeout = 20; -- Some number possibly influencing how many pending connections can be accepted tcp_backlog = 128; @@ -46,7 +54,7 @@ local default_config = { __index = { -- If accepting a new incoming connection fails, wait this long before trying again accept_retry_interval = 10; - -- If there is still more data to read from LuaSocktes buffer, wait this long and read again + -- If there is still more data to read from LuaSockets buffer, wait this long and read again read_retry_delay = 1e-06; -- Size of chunks to read from sockets @@ -57,7 +65,30 @@ local default_config = { __index = { -- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers) max_wait = 86400; - min_wait = 1e-06; + min_wait = 0.001; + + -- Enable extra noisy debug logging + -- TODO disable once considered stable + verbose = true; + + -- EXPERIMENTAL + -- Whether to kill connections in case of callback errors. + fatal_errors = false; + + -- Or disable protection (like server_select) for potential performance gains + protect_listeners = true; + + -- Attempt writes instantly + opportunistic_writes = false; + + -- TCP Keepalives + tcp_keepalive = false; -- boolean | number + + -- Whether to let the Nagle algorithm stay enabled + nagle = true; + + -- Reuse write buffer tables + keep_buffers = true; --- How long to wait after getting the shutdown signal before forcefully tearing down every socket shutdown_deadline = 5; @@ -71,54 +102,62 @@ local fds = createtable(10, 0); -- FD -> conn local timers = indexedbheap.create(); local function noop() end -local function closetimer(t) - t[1] = 0; - t[2] = noop; - timers:remove(t.id); -end -local function reschedule(t, time) - t[1] = time; - timers:reprioritize(t.id, time); +-- Keep track of recently closed timers to avoid re-adding them +local closedtimers = {}; + +local function closetimer(id) + if timers:remove(id) then + closedtimers[id] = true; + end end --- Add absolute timer -local function at(time, f) - local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; - timer.id = timers:insert(timer, time); - return timer; +local function reschedule(id, time) + time = monotonic() + time; + timers:reprioritize(id, time); end -- Add relative timer -local function addtimer(timeout, f) - return at(gettime() + timeout, f); +local function addtimer(timeout, f, param) + local time = monotonic() + timeout; + if param ~= nil then + local timer_callback = f + function f(current_time, timer_id) + local t = timer_callback(current_time, timer_id, param) + return t; + end + end + local id = timers:insert(f, time); + return id; end -- Run callbacks of expired timers -- Return time until next timeout local function runtimers(next_delay, min_wait) -- Any timers at all? - local now = gettime(); + local elapsed = monotonic(); + local now = realtime(); local peek = timers:peek(); local readd; while peek do - if peek > now then + if peek > elapsed then break; end local _, timer, id = timers:pop(); - local ok, ret = pcall(timer[2], now); - if ok and type(ret) == "number" then - local next_time = now+ret; - timer[1] = next_time; + local ok, ret = xpcall(timer, traceback, now, id); + if ok and type(ret) == "number" and not closedtimers[id] then + local next_time = elapsed+ret; -- Delay insertion of timers to be re-added -- so they don't get called again this tick if readd then - readd[id] = timer; + readd[id] = { timer, next_time }; else - readd = { [id] = timer }; + readd = { [id] = { timer, next_time } }; end + elseif not ok then + log("error", "Error in timer: %s", ret); end peek = timers:peek(); @@ -126,15 +165,19 @@ local function runtimers(next_delay, min_wait) if readd then for id, timer in pairs(readd) do - timers:insert(timer, timer[1], id); + timers:insert(timer[1], timer[2], id); end peek = timers:peek(); end + if next(closedtimers) ~= nil then + closedtimers = {}; + end + if peek == nil then return next_delay; else - next_delay = peek - now; + next_delay = peek - elapsed; end if next_delay < min_wait then @@ -157,6 +200,22 @@ function interface_mt:__tostring() return ("FD %d"):format(self:getfd()); end +interface.log = log; +function interface:debug(msg, ...) + self.log("debug", msg, ...); +end + +interface.noise = interface.debug; +function interface:noise(msg, ...) + if cfg.verbose then + return self:debug(msg, ...); + end +end + +function interface:error(msg, ...) + self.log("error", msg, ...); +end + -- Replace the listener and tell the old one function interface:setlistener(listeners, data) self:on("detach"); @@ -167,21 +226,36 @@ end -- Call a listener callback function interface:on(what, ...) if not self.listeners then - log("error", "%s has no listeners", self); + self:error("Interface is missing listener callbacks"); return; end local listener = self.listeners["on"..what]; if not listener then - -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging + self:noise("Missing listener 'on%s'", what); -- uncomment for development and debugging return; end - local ok, err = pcall(listener, self, ...); + if not cfg.protect_listeners then + return listener(self, ...); + end + local onerror = self.listeners.onerror or traceback; + local ok, err = xpcall(listener, onerror, self, ...); if not ok then - log("error", "Error calling on%s: %s", what, err); + if cfg.fatal_errors then + self:error("Closing due to error calling on%s: %s", what, err); + self:destroy(); + else + self:error("Error calling on%s: %s", what, err); + end + return nil, err; end return err; end +-- Allow this one to be overridden +function interface:onincoming(...) + return self:on("incoming", ...); +end + -- Return the file descriptor number function interface:getfd() if self.conn then @@ -201,20 +275,24 @@ end -- Get a port number, doesn't matter which function interface:port() - return self.sockport or self.peerport; + return self.peerport or self.sockport; end --- Get local port number +-- Client-side port (usually a random high port) function interface:clientport() - return self.sockport; + if self._server then + return self.peerport; + else + return self.sockport; + end end --- Get remote port +-- Get port on the server function interface:serverport() - if self.sockport then + if self._server then return self.sockport; - elseif self._server then - self._server:port(); + else + return self.peerport; end end @@ -229,28 +307,36 @@ end function interface:setoption(k, v) -- LuaSec doesn't expose setoption :( - if self.conn.setoption then - self.conn:setoption(k, v); + local ok, ret, err = pcall(self.conn.setoption, self.conn, k, v); + if not ok then + self:noise("Setting option %q = %q failed: %s", k, v, ret); + return ok, ret; + elseif not ret then + self:noise("Setting option %q = %q failed: %s", k, v, err); + return ret, err; end + return ret; end -- Timeout for detecting dead or idle sockets function interface:setreadtimeout(t) if t == false then if self._readtimeout then - self._readtimeout:close(); + closetimer(self._readtimeout); self._readtimeout = nil; end return end t = t or cfg.read_timeout; if self._readtimeout then - self._readtimeout:reschedule(gettime() + t); + reschedule(self._readtimeout, t); else self._readtimeout = addtimer(t, function () if self:on("readtimeout") then + self:noise("Read timeout handled"); return cfg.read_timeout; else + self:debug("Read timeout not handled, disconnecting"); self:on("disconnect", "read timeout"); self:destroy(); end @@ -262,17 +348,18 @@ end function interface:setwritetimeout(t) if t == false then if self._writetimeout then - self._writetimeout:close(); + closetimer(self._writetimeout); self._writetimeout = nil; end return end t = t or cfg.send_timeout; if self._writetimeout then - self._writetimeout:reschedule(gettime() + t); + reschedule(self._writetimeout, t); else self._writetimeout = addtimer(t, function () - self:on("disconnect", "write timeout"); + self:noise("Write timeout"); + self:on("disconnect", self._connected and "write timeout" or "connection timeout"); self:destroy(); end); end @@ -288,15 +375,15 @@ function interface:add(r, w) local ok, err, errno = poll:add(fd, r, w); if not ok then if errno == EEXIST then - log("debug", "%s already registered!", self); + self:debug("FD already registered in poller! (EEXIST)"); return self:set(r, w); -- So try to change its flags end - log("error", "Could not register %s: %s(%d)", self, err, errno); + self:debug("Could not register in poller: %s(%d)", err, errno); return ok, err; end self._wantread, self._wantwrite = r, w; fds[fd] = self; - log("debug", "Watching %s", self); + self:noise("Registered in poller"); return true; end @@ -309,7 +396,7 @@ function interface:set(r, w) if w == nil then w = self._wantwrite; end local ok, err, errno = poll:set(fd, r, w); if not ok then - log("error", "Could not update poller state %s: %s(%d)", self, err, errno); + self:debug("Could not update poller state: %s(%d)", err, errno); return ok, err; end self._wantread, self._wantwrite = r, w; @@ -326,12 +413,12 @@ function interface:del() end local ok, err, errno = poll:del(fd); if not ok and errno ~= ENOENT then - log("error", "Could not unregister %s: %s(%d)", self, err, errno); + self:debug("Could not unregister: %s(%d)", err, errno); return ok, err; end self._wantread, self._wantwrite = nil, nil; fds[fd] = nil; - log("debug", "Unwatched %s", self); + self:noise("Unregistered from poller"); return true; end @@ -353,27 +440,44 @@ function interface:onreadable() local data, err, partial = self.conn:receive(self.read_size or cfg.read_size); if data then self:onconnect(); - self:on("incoming", data); + self:onincoming(data); else if err == "wantread" then self:set(true, nil); err = "timeout"; elseif err == "wantwrite" then self:set(nil, true); + self:setwritetimeout(); err = "timeout"; + elseif err == "timeout" and not self._connected then + err = "connection timeout"; end if partial and partial ~= "" then self:onconnect(); - self:on("incoming", partial, err); + self:onincoming(partial, err); end - if err ~= "timeout" then + if err == "closed" and self._connected then + self:debug("Connection closed by remote"); + self:close(err); + return; + elseif err ~= "timeout" then + self:debug("Read error, closing (%s)", err); self:on("disconnect", err); - self:destroy() + self:destroy(); return; end end if not self.conn then return; end - if self._wantread and self.conn:dirty() then + if self._limit and (data or partial) then + local cost = self._limit * #(data or partial); + if cost > cfg.min_wait then + self:setreadtimeout(false); + self:pausefor(cost); + return; + end + end + if not self._wantread then return end + if self.conn:dirty() then self:setreadtimeout(false); self:pausefor(cfg.read_retry_delay); else @@ -385,31 +489,55 @@ end function interface:onwritable() self:onconnect(); if not self.conn then return; end -- could have been closed in onconnect + self:on("predrain"); local buffer = self.writebuffer; - local data = t_concat(buffer); + local data = buffer or ""; + if type(buffer) == "table" then + if buffer[3] then + data = t_concat(data); + elseif buffer[2] then + data = buffer[1] .. buffer[2]; + else + data = buffer[1] or ""; + end + end local ok, err, partial = self.conn:send(data); + self._writable = ok; if ok then self:set(nil, false); - for i = #buffer, 1, -1 do - buffer[i] = nil; + if cfg.keep_buffers and type(buffer) == "table" then + for i = #buffer, 1, -1 do + buffer[i] = nil; + end + else + self.writebuffer = nil; end self:setwritetimeout(false); self:ondrain(); -- Be aware of writes in ondrain - return; + return ok; elseif partial then - buffer[1] = data:sub(partial+1); - for i = #buffer, 2, -1 do - buffer[i] = nil; + self:debug("Sent %d out of %d buffered bytes", partial, #data); + if cfg.keep_buffers and type(buffer) == "table" then + buffer[1] = data:sub(partial+1); + for i = #buffer, 2, -1 do + buffer[i] = nil; + end + else + self.writebuffer = data:sub(partial+1); end + self:set(nil, true); self:setwritetimeout(); end if err == "wantwrite" or err == "timeout" then self:set(nil, true); + self:setwritetimeout(); elseif err == "wantread" then self:set(true, nil); + self:setreadtimeout(); elseif err ~= "timeout" then self:on("disconnect", err); self:destroy(); + return ok, err; end end @@ -421,26 +549,39 @@ end -- Add data to write buffer and set flag for wanting to write function interface:write(data) local buffer = self.writebuffer; - if buffer then + if type(buffer) == "table" then t_insert(buffer, data); - else - self.writebuffer = { data }; + elseif type(buffer) == "string" then + self:noise("Allocating buffer!") + self.writebuffer = { buffer, data }; + elseif buffer == nil then + self.writebuffer = data; + end + if not self._write_lock then + if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then + self._opportunistic_write = true; + local ret, err = self:onwritable(); + self._opportunistic_write = nil; + return ret, err; + end + self:setwritetimeout(); + self:set(nil, true); end - self:setwritetimeout(); - self:set(nil, true); return #data; end interface.send = interface.write; -- Close, possibly after writing is done function interface:close() - if self.writebuffer and self.writebuffer[1] then + if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then self:set(false, true); -- Flush final buffer contents + self:setreadtimeout(false); + self:setwritetimeout(); self.write, self.send = noop, noop; -- No more writing - log("debug", "Close %s after writing", self); + self:debug("Close after writing remaining buffered data"); self.ondrain = interface.close; else - log("debug", "Close %s now", self); + self:debug("Closing now"); self.write, self.send = noop, noop; self.close = noop; self:on("disconnect"); @@ -465,70 +606,108 @@ function interface:ssl() return self._tls; end +function interface:set_sslctx(sslctx) + self._sslctx = sslctx; +end + function interface:starttls(tls_ctx) if tls_ctx then self.tls_ctx = tls_ctx; end self.starttls = false; - if self.writebuffer and self.writebuffer[1] then - log("debug", "Start TLS on %s after write", self); + if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then + self:debug("Start TLS after write"); self.ondrain = interface.starttls; self:set(nil, true); -- make sure wantwrite is set else if self.ondrain == interface.starttls then self.ondrain = nil; end - self.onwritable = interface.tlshandskake; - self.onreadable = interface.tlshandskake; + self.onwritable = interface.inittls; + self.onreadable = interface.inittls; self:set(true, true); - log("debug", "Prepare to start TLS on %s", self); + self:setreadtimeout(false); + self:setwritetimeout(cfg.ssl_handshake_timeout); + self:debug("Prepared to start TLS"); end end -function interface:tlshandskake() - self:setwritetimeout(false); - self:setreadtimeout(false); - if not self._tls then - self._tls = true; - log("debug", "Start TLS on %s now", self); - self:del(); - local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); - if not ok then - conn, err = ok, conn; - log("error", "Failed to initialize TLS: %s", err); - end - if not conn then - self:on("disconnect", err); - self:destroy(); - return conn, err; - end - conn:settimeout(0); - self.conn = conn; - if conn.sni and self.servername then +function interface:inittls(tls_ctx, now) + if self._tls then return end + if tls_ctx then self.tls_ctx = tls_ctx; end + self._tls = true; + self:debug("Starting TLS now"); + self:updatenames(); -- Can't getpeer/sockname after wrap() + local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); + if not ok then + conn, err = ok, conn; + self:debug("Failed to initialize TLS: %s", err); + end + if not conn then + self:on("disconnect", err); + self:destroy(); + return conn, err; + end + conn:settimeout(0); + self.conn = conn; + if conn.sni then + if self.servername then conn:sni(self.servername); + elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then + conn:sni(self._server.hosts, true); end - self:on("starttls"); - self.ondrain = nil; - self.onwritable = interface.tlshandskake; - self.onreadable = interface.tlshandskake; - return self:init(); end + if self.extra and self.extra.tlsa and conn.settlsa then + -- TODO Error handling + if not conn:setdane(self.servername or self.extra.dane_hostname) then + self:debug("Could not enable DANE on connection"); + else + self:debug("Enabling DANE with %d TLSA records", #self.extra.tlsa); + self:noise("DANE hostname is %q", self.servername or self.extra.dane_hostname); + for _, tlsa in ipairs(self.extra.tlsa) do + self:noise("TLSA: %q", tlsa); + conn:settlsa(tlsa.use, tlsa.select, tlsa.match, tlsa.data); + end + end + end + self:on("starttls"); + self.ondrain = nil; + self.onwritable = interface.tlshandshake; + self.onreadable = interface.tlshandshake; + if now then + return self:tlshandshake() + end + self:setreadtimeout(false); + self:setwritetimeout(cfg.ssl_handshake_timeout); + self:set(true, true); +end + +function interface:tlshandshake() + self:setreadtimeout(false); + self:noise("Continuing TLS handshake"); local ok, err = self.conn:dohandshake(); if ok then - log("debug", "TLS handshake on %s complete", self); + local info = self.conn.info and self.conn:info(); + if type(info) == "table" then + self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher); + else + self:debug("TLS handshake complete"); + end + self:setwritetimeout(false); self.onwritable = nil; self.onreadable = nil; self:on("status", "ssl-handshake-complete"); - self:setwritetimeout(); self:set(true, true); + self:onconnect(); + self:onreadable(); elseif err == "wantread" then - log("debug", "TLS handshake on %s to wait until readable", self); + self:noise("TLS handshake to wait until readable"); self:set(true, false); - self:setreadtimeout(cfg.ssl_handshake_timeout); + self:setwritetimeout(cfg.ssl_handshake_timeout); elseif err == "wantwrite" then - log("debug", "TLS handshake on %s to wait until writable", self); + self:noise("TLS handshake to wait until writable"); self:set(false, true); self:setwritetimeout(cfg.ssl_handshake_timeout); else - log("debug", "TLS handshake error on %s: %s", self, err); + self:debug("TLS handshake error: %s", err); self:on("disconnect", err); self:destroy(); end @@ -536,15 +715,18 @@ end local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object client:settimeout(0); + local conn_id = ("conn%s"):format(new_id()); local conn = setmetatable({ conn = client; _server = server; - created = gettime(); + created = realtime(); listeners = listeners; read_size = read_size or (server and server.read_size); - writebuffer = {}; + writebuffer = nil; tls_ctx = tls_ctx or (server and server.tls_ctx); tls_direct = server and server.tls_direct; + id = conn_id; + log = logger.init(conn_id); extra = extra; }, interface_mt); @@ -561,12 +743,12 @@ end function interface:updatenames() local conn = self.conn; local ok, peername, peerport = pcall(conn.getpeername, conn); - if ok then - self.peername, self.peerport = peername, peerport; + if ok and peername then + self.peername, self.peerport = peername, peerport or 0; end local ok, sockname, sockport = pcall(conn.getsockname, conn); - if ok then - self.sockname, self.sockport = sockname, sockport; + if ok and sockname then + self.sockname, self.sockport = sockname, sockport or 0; end end @@ -575,76 +757,149 @@ end function interface:onacceptable() local conn, err = self.conn:accept(); if not conn then - log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); + self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); self:pausefor(cfg.accept_retry_interval); return; end local client = wrapsocket(conn, self, nil, self.listeners); - log("debug", "New connection %s", tostring(client)); - client:init(); + client:debug("New connection %s on server %s", client, self); + client:defaultoptions(); + client._writable = cfg.opportunistic_writes; if self.tls_direct then - client:starttls(self.tls_ctx); + client:add(true, true); + client:inittls(self.tls_ctx, true); + else + client:add(true, false); + client:onconnect(); + client:onreadable(); end end --- Initialization +-- Initialization for outgoing connections function interface:init() - self:setwritetimeout(); + self:setwritetimeout(cfg.connect_timeout); + self:defaultoptions(); return self:add(true, true); end +function interface:defaultoptions() + if cfg.nagle == false then + self:setoption("tcp-nodelay", true); + end + if cfg.tcp_keepalive then + self:setoption("keepalive", true); + if type(cfg.tcp_keepalive) == "number" then + self:setoption("tcp-keepidle", cfg.tcp_keepalive); + end + end +end + function interface:pause() + self:noise("Pause reading"); + self:setreadtimeout(false); return self:set(false); end function interface:resume() + self:noise("Resume reading"); + self:setreadtimeout(); return self:set(true); end -- Pause connection for some time function interface:pausefor(t) + self:noise("Pause for %fs", t); if self._pausefor then - self._pausefor:close(); + closetimer(self._pausefor); + self._pausefor = nil; end if t == false then return; end self:set(false); self._pausefor = addtimer(t, function () self._pausefor = nil; self:set(true); + self:noise("Resuming after pause"); if self.conn and self.conn:dirty() then + self:noise("Have buffered incoming data to process"); self:onreadable(); end end); end +function interface:setlimit(Bps) + if Bps > 0 then + self._limit = 1/Bps; + else + self._limit = nil; + end +end + +function interface:pause_writes() + if self._write_lock then + return + end + self:noise("Pause writes"); + self._write_lock = true; + self:setwritetimeout(false); + self:set(nil, false); +end + +function interface:resume_writes() + if not self._write_lock then + return + end + self:noise("Resume writes"); + self._write_lock = nil; + if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then + self:setwritetimeout(); + self:set(nil, true); + end +end + -- Connected! function interface:onconnect() - if self.conn and not self.peername and self.conn.getpeername then - self.peername, self.peerport = self.conn:getpeername(); - end + self._connected = true; + self:updatenames(); + self:debug("Connected (%s)", self); self.onconnect = noop; self:on("connect"); end -local function addserver(addr, port, listeners, read_size, tls_ctx) - local conn, err = socket.bind(addr, port, cfg.tcp_backlog); - if not conn then return conn, err; end - conn:settimeout(0); +local function wrapserver(conn, addr, port, listeners, config) local server = setmetatable({ conn = conn; - created = gettime(); + created = realtime(); listeners = listeners; - read_size = read_size; + read_size = config and config.read_size; onreadable = interface.onacceptable; - tls_ctx = tls_ctx; - tls_direct = tls_ctx and true or false; + tls_ctx = config and config.tls_ctx; + tls_direct = config and config.tls_direct; + hosts = config and config.sni_hosts; sockname = addr; sockport = port; + log = logger.init(("serv%s"):format(new_id())); }, interface_mt); + server:debug("Server %s created", server); server:add(true, false); return server; end +local function listen(addr, port, listeners, config) + local conn, err = socket.bind(addr, port, cfg.tcp_backlog); + if not conn then return conn, err; end + conn:settimeout(0); + return wrapserver(conn, addr, port, listeners, config); +end + +-- COMPAT +local function addserver(addr, port, listeners, read_size, tls_ctx) + return listen(addr, port, listeners, { + read_size = read_size; + tls_ctx = tls_ctx; + tls_direct = tls_ctx and true or false; + }); +end + -- COMPAT local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra) local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra); @@ -678,13 +933,19 @@ local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra) return nil, "invalid socket type"; end local conn, err = create(); + if not conn then return conn, err; end local ok, err = conn:settimeout(0); if not ok then return ok, err; end local ok, err = conn:setpeername(addr, port); if not ok and err ~= "timeout" then return ok, err; end local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra) local ok, err = client:init(); + if not client.peername then + -- otherwise not set until connected + client.peername, client.peerport = addr, port; + end if not ok then return ok, err; end + client:debug("Client %s created", client); if tls_ctx then client:starttls(tls_ctx); end @@ -706,23 +967,23 @@ local function watchfd(fd, onreadable, onwritable) end; -- Otherwise it'll need to be something LuaSocket-compatible end + conn.id = new_id(); + conn.log = logger.init(("fdwatch%s"):format(conn.id)); conn:add(onreadable, onwritable); return conn; end; -- Dump all data from one connection into another -local function link(from, to) - from.listeners = setmetatable({ - onincoming = function (_, data) - from:pause(); - to:write(data); - end, - }, {__index=from.listeners}); - to.listeners = setmetatable({ - ondrain = function () - from:resume(); - end, - }, {__index=to.listeners}); +local function link(from, to, read_size) + from:debug("Linking to %s", to.id); + function from:onincoming(data) + self:pause(); + to:write(data); + end + function to:ondrain() -- luacheck: ignore 212/self + from:resume(); + end + from:set_mode(read_size); from:set(true, nil); to:set(nil, true); end @@ -796,11 +1057,21 @@ return { addserver = addserver; addclient = addclient; add_task = addtimer; - at = at; + timer = { + -- API-compatible with util.timer + add_task = addtimer; + stop = closetimer; + reschedule = reschedule; + to_absolute_time = function (t) + return t-monotonic()+realtime(); + end; + }; + listen = listen; loop = loop; closeall = closeall; setquitting = setquitting; wrapclient = wrapclient; + wrapserver = wrapserver; watchfd = watchfd; link = link; set_config = function (newconfig) @@ -810,6 +1081,7 @@ return { -- libevent emulation event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 }; addevent = function (fd, mode, callback) + log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead"); local function onevent(self) local ret = self:callback(); if ret == -1 then @@ -829,6 +1101,8 @@ return { fds[fd] = nil; end; }, interface_mt); + conn.id = conn:getfd(); + conn.log = logger.init(("fdwatch%d"):format(conn.id)); local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); if not ok then return ok, err; end return conn; |