aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2024-11-09 00:37:15 +0100
committerKim Alvefur <zash@zash.se>2024-11-09 00:37:15 +0100
commit693079c619ef4d33c90a20979ac5d97b0c2712ec (patch)
treeedc821da1fb38afc1021d526ceb633b4b5ece13f
parent3b079188006e47fdea16e267492033c56bdc6391 (diff)
downloadprosody-693079c619ef4d33c90a20979ac5d97b0c2712ec.tar.gz
prosody-693079c619ef4d33c90a20979ac5d97b0c2712ec.zip
net.server_epoll: Improve efficiency of sending much buffered data
Problem: The string slice operations when a lot of data gets buffered ends up being expensive and memory-consuming. We have util.dbuffer for precisely this kind of thing. I want to keep the behavior of writebuffer being upgraded from nil to a string to full buffer since the last step involves three table allocations, where the previous buffer method only used one. Avoiding those allocations for simple writes like white space keep alive feels like it would keep memory churn down. This work was started in 2020
-rw-r--r--net/server_epoll.lua102
1 files changed, 68 insertions, 34 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index 6c110241..89b36233 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -6,8 +6,6 @@
--
-local t_insert = table.insert;
-local t_concat = table.concat;
local setmetatable = setmetatable;
local pcall = pcall;
local type = type;
@@ -22,6 +20,7 @@ local realtime = require "prosody.util.time".now;
local monotonic = require "prosody.util.time".monotonic;
local indexedbheap = require "prosody.util.indexedbheap";
local createtable = require "prosody.util.table".create;
+local dbuffer = require "prosody.util.dbuffer";
local inet = require "prosody.util.net";
local inet_pton = inet.pton;
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
@@ -94,6 +93,15 @@ local default_config = { __index = {
-- Size of chunks to read from sockets
read_size = 8192;
+ -- Maximum size of send buffer, after which additional data is rejected
+ max_send_buffer_size = 32*1024*1024;
+
+ -- How many chunks (immutable strings) to keep in the send buffer
+ send_buffer_chunks = nil;
+
+ -- Maximum amount of data to send at once (to the TCP buffers), default based on /proc/sys/net/ipv4/tcp_wmem
+ max_send_chunk = 4*1024*1024;
+
-- Timeout used during between steps in TLS handshakes
ssl_handshake_timeout = 60;
@@ -533,26 +541,21 @@ function interface:onwritable()
self:onconnect();
if not self.conn then return nil, "no-conn"; end -- could have been closed in onconnect
self:on("predrain");
- local buffer = self.writebuffer;
- local data = buffer or "";
- if type(buffer) == "table" then
- if buffer[3] then
- data = t_concat(data);
- elseif buffer[2] then
- data = buffer[1] .. buffer[2];
- else
- data = buffer[1] or "";
- end
- end
+ local buffer = self.writebuffer or "";
+ -- Naming things ... s/data/slice/ ?
+ local data = buffer:sub(1, cfg.max_send_chunk);
local ok, err, partial = self.conn:send(data);
self._writable = ok;
- if ok then
+ if ok and #data < #buffer then
+ -- Sent the whole 'data' but there's more in the buffer
+ ok, err, partial = nil, "timeout", ok;
+ end
+ self:debug("Sent %d out of %d buffered bytes", ok and #data or partial or 0, #buffer);
+ if ok then -- all the data we had was sent successfully
self:set(nil, false);
if cfg.keep_buffers and type(buffer) == "table" then
- for i = #buffer, 1, -1 do
- buffer[i] = nil;
- end
- else
+ buffer:discard(ok);
+ else -- string or don't keep buffers
self.writebuffer = nil;
end
self._writing = nil;
@@ -560,14 +563,10 @@ function interface:onwritable()
self:ondrain(); -- Be aware of writes in ondrain
return ok;
elseif partial then
- self:debug("Sent %d out of %d buffered bytes", partial, #data);
- if cfg.keep_buffers and type(buffer) == "table" then
- buffer[1] = data:sub(partial+1);
- for i = #buffer, 2, -1 do
- buffer[i] = nil;
- end
+ if type(buffer) == "table" then
+ buffer:discard(partial);
else
- self.writebuffer = data:sub(partial+1);
+ self.writebuffer = data:sub(partial + 1);
end
self:set(nil, true);
self:setwritetimeout();
@@ -595,13 +594,45 @@ end
-- Add data to write buffer and set flag for wanting to write
function interface:write(data)
local buffer = self.writebuffer;
- if type(buffer) == "table" then
- t_insert(buffer, data);
- elseif type(buffer) == "string" then
- self:noise("Allocating buffer!")
- self.writebuffer = { buffer, data };
- elseif buffer == nil then
+ -- (nil) -> save string
+ -- (string) -> convert to buffer (3 tables!)
+ -- (buffer) -> write to buffer
+ if not buffer then
self.writebuffer = data;
+ elseif type(buffer) == "string" then
+ local prev_buffer = buffer;
+ buffer = dbuffer.new(cfg.max_send_buffer_size, cfg.send_buffer_chunks);
+ self.writebuffer = buffer;
+ if prev_buffer then
+ -- TODO refactor, there's 3 copies of these lines
+ if not buffer:write(prev_buffer) then
+ if self._write_lock then
+ return false;
+ end
+ -- Try to flush buffer to make room
+ self:onwritable();
+ if not buffer:write(prev_buffer) then
+ return false;
+ end
+ end
+ end
+ if not buffer:write(data) then
+ if self._write_lock then
+ return false;
+ end
+ self:onwritable();
+ if not buffer:write(data) then
+ return false;
+ end
+ end
+ elseif not buffer:write(data) then
+ if self._write_lock then
+ return false;
+ end
+ self:onwritable();
+ if not buffer:write(data) then
+ return false;
+ end
end
if not self._write_lock and not self._writing then
if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then
@@ -619,7 +650,7 @@ interface.send = interface.write;
-- Close, possibly after writing is done
function interface:close()
- if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
self._connected = false;
self:set(false, true); -- Flush final buffer contents
self:setreadtimeout(false);
@@ -701,7 +732,7 @@ end
function interface:starttls(tls_ctx)
if tls_ctx then self.tls_ctx = tls_ctx; end
self.starttls = false;
- if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
self:debug("Start TLS after write");
self.ondrain = interface.starttls;
self:set(nil, true); -- make sure wantwrite is set
@@ -935,7 +966,10 @@ function interface:resume_writes()
end
self:noise("Resume writes");
self._write_lock = nil;
- if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+ if self.writebuffer and #self.writebuffer ~= 0 then
+ if cfg.opportunistic_writes then
+ return self:onwritable();
+ end
self:setwritetimeout();
self:set(nil, true);
end