aboutsummaryrefslogtreecommitdiffstats
path: root/net/server_epoll.lua
diff options
context:
space:
mode:
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r--net/server_epoll.lua148
1 files changed, 114 insertions, 34 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index c946a751..24678842 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -6,8 +6,6 @@
--
-local t_insert = table.insert;
-local t_concat = table.concat;
local setmetatable = setmetatable;
local pcall = pcall;
local type = type;
@@ -22,6 +20,7 @@ local realtime = require "prosody.util.time".now;
local monotonic = require "prosody.util.time".monotonic;
local indexedbheap = require "prosody.util.indexedbheap";
local createtable = require "prosody.util.table".create;
+local dbuffer = require "prosody.util.dbuffer";
local inet = require "prosody.util.net";
local inet_pton = inet.pton;
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
@@ -35,6 +34,38 @@ local poller = require "prosody.util.poll"
local EEXIST = poller.EEXIST;
local ENOENT = poller.ENOENT;
+-- systemd socket activation
+local SD_LISTEN_FDS_START = 3;
+local SD_LISTEN_FDS = tonumber(os.getenv("LISTEN_FDS")) or 0;
+
+local inherited_sockets = setmetatable({}, {
+ __index = function(t, k)
+ local serv_mt = debug.getregistry()["tcp{server}"];
+ for i = 1, SD_LISTEN_FDS do
+ local serv = socket.tcp();
+ if serv:getfd() ~= _SOCKETINVALID then
+ -- If LuaSocket allocated a FD for then we can't really close it and it would leak.
+ log("error", "LuaSocket not compatible with socket activation. Upgrade LuaSocket or disable socket activation.");
+ setmetatable(t, nil);
+ break
+ end
+ serv:setfd(SD_LISTEN_FDS_START + i - 1);
+ debug.setmetatable(serv, serv_mt);
+ serv:settimeout(0);
+ local ip, port = serv:getsockname();
+ t[ip .. ":" .. port] = serv;
+ if ip == "0.0.0.0" then
+ -- LuaSocket treats '*' as an alias for '0.0.0.0'
+ t["*:" .. port] = serv;
+ end
+ end
+
+ -- Disable lazy-loading mechanism once performed
+ setmetatable(t, nil);
+ return t[k];
+ end;
+});
+
local poll = assert(poller.new());
local _ENV = nil;
@@ -62,6 +93,15 @@ local default_config = { __index = {
-- Size of chunks to read from sockets
read_size = 8192;
+ -- Maximum size of send buffer, after which additional data is rejected
+ max_send_buffer_size = 32*1024*1024;
+
+ -- How many chunks (immutable strings) to keep in the send buffer
+ send_buffer_chunks = nil;
+
+ -- Maximum amount of data to send at once (to the TCP buffers), default based on /proc/sys/net/ipv4/tcp_wmem
+ max_send_chunk = 4*1024*1024;
+
-- Timeout used during between steps in TLS handshakes
ssl_handshake_timeout = 60;
@@ -501,26 +541,21 @@ function interface:onwritable()
self:onconnect();
if not self.conn then return nil, "no-conn"; end -- could have been closed in onconnect
self:on("predrain");
- local buffer = self.writebuffer;
- local data = buffer or "";
- if type(buffer) == "table" then
- if buffer[3] then
- data = t_concat(data);
- elseif buffer[2] then
- data = buffer[1] .. buffer[2];
- else
- data = buffer[1] or "";
- end
- end
+ local buffer = self.writebuffer or "";
+ -- Naming things ... s/data/slice/ ?
+ local data = buffer:sub(1, cfg.max_send_chunk);
local ok, err, partial = self.conn:send(data);
self._writable = ok;
- if ok then
+ if ok and #data < #buffer then
+ -- Sent the whole 'data' but there's more in the buffer
+ ok, err, partial = nil, "timeout", ok;
+ end
+ self:debug("Sent %d out of %d buffered bytes", ok and #data or partial or 0, #buffer);
+ if ok then -- all the data we had was sent successfully
self:set(nil, false);
if cfg.keep_buffers and type(buffer) == "table" then
- for i = #buffer, 1, -1 do
- buffer[i] = nil;
- end
- else
+ buffer:discard(ok);
+ else -- string or don't keep buffers
self.writebuffer = nil;
end
self._writing = nil;
@@ -528,14 +563,10 @@ function interface:onwritable()
self:ondrain(); -- Be aware of writes in ondrain
return ok;
elseif partial then
- self:debug("Sent %d out of %d buffered bytes", partial, #data);
- if cfg.keep_buffers and type(buffer) == "table" then
- buffer[1] = data:sub(partial+1);
- for i = #buffer, 2, -1 do
- buffer[i] = nil;
- end
+ if type(buffer) == "table" then
+ buffer:discard(partial);
else
- self.writebuffer = data:sub(partial+1);
+ self.writebuffer = data:sub(partial + 1);
end
self:set(nil, true);
self:setwritetimeout();
@@ -563,13 +594,51 @@ end
-- Add data to write buffer and set flag for wanting to write
function interface:write(data)
local buffer = self.writebuffer;
- if type(buffer) == "table" then
- t_insert(buffer, data);
- elseif type(buffer) == "string" then
- self:noise("Allocating buffer!")
- self.writebuffer = { buffer, data };
- elseif buffer == nil then
+ -- (nil) -> save string
+ -- (string) -> convert to buffer (3 tables!)
+ -- (buffer) -> write to buffer
+ if not buffer then
self.writebuffer = data;
+ elseif type(buffer) == "string" then
+ local prev_buffer = buffer;
+ buffer = dbuffer.new(cfg.max_send_buffer_size, cfg.send_buffer_chunks);
+ self.writebuffer = buffer;
+ if prev_buffer then
+ -- TODO refactor, there's 3 copies of these lines
+ if not buffer:write(prev_buffer) then
+ if self._write_lock then
+ return false;
+ end
+ -- Try to flush buffer to make room
+ self:onwritable();
+ if not buffer:write(prev_buffer) then
+ self:on("disconnect", "no space left in buffer");
+ self:destroy();
+ return false;
+ end
+ end
+ end
+ if not buffer:write(data) then
+ if self._write_lock then
+ return false;
+ end
+ self:onwritable();
+ if not buffer:write(data) then
+ self:on("disconnect", "no space left in buffer");
+ self:destroy();
+ return false;
+ end
+ end
+ elseif not buffer:write(data) then
+ if self._write_lock then
+ return false;
+ end
+ self:onwritable();
+ if not buffer:write(data) then
+ self:on("disconnect", "no space left in buffer");
+ self:destroy();
+ return false;
+ end
end
if not self._write_lock and not self._writing then
if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then
@@ -587,7 +656,7 @@ interface.send = interface.write;
-- Close, possibly after writing is done
function interface:close()
- if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
self._connected = false;
self:set(false, true); -- Flush final buffer contents
self:setreadtimeout(false);
@@ -669,7 +738,7 @@ end
function interface:starttls(tls_ctx)
if tls_ctx then self.tls_ctx = tls_ctx; end
self.starttls = false;
- if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
self:debug("Start TLS after write");
self.ondrain = interface.starttls;
self:set(nil, true); -- make sure wantwrite is set
@@ -903,7 +972,10 @@ function interface:resume_writes()
end
self:noise("Resume writes");
self._write_lock = nil;
- if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
+ if cfg.opportunistic_writes then
+ return self:onwritable();
+ end
self:setwritetimeout();
self:set(nil, true);
end
@@ -944,6 +1016,14 @@ local function wrapserver(conn, addr, port, listeners, config)
end
local function listen(addr, port, listeners, config)
+ local inherited = inherited_sockets[addr .. ":" .. port];
+ if inherited then
+ local conn = wrapserver(inherited, addr, port, listeners, config);
+ -- sockets created by systemd must not be :close() since we may not have
+ -- privileges to create them
+ conn.destroy = interface.del;
+ return conn;
+ end
local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
if not conn then return conn, err; end
conn:settimeout(0);