diff options
Diffstat (limited to 'net/server_epoll.lua')
-rw-r--r-- | net/server_epoll.lua | 102 |
1 files changed, 68 insertions, 34 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua index 6c110241..89b36233 100644 --- a/net/server_epoll.lua +++ b/net/server_epoll.lua @@ -6,8 +6,6 @@ -- -local t_insert = table.insert; -local t_concat = table.concat; local setmetatable = setmetatable; local pcall = pcall; local type = type; @@ -22,6 +20,7 @@ local realtime = require "prosody.util.time".now; local monotonic = require "prosody.util.time".monotonic; local indexedbheap = require "prosody.util.indexedbheap"; local createtable = require "prosody.util.table".create; +local dbuffer = require "prosody.util.dbuffer"; local inet = require "prosody.util.net"; local inet_pton = inet.pton; local _SOCKETINVALID = socket._SOCKETINVALID or -1; @@ -94,6 +93,15 @@ local default_config = { __index = { -- Size of chunks to read from sockets read_size = 8192; + -- Maximum size of send buffer, after which additional data is rejected + max_send_buffer_size = 32*1024*1024; + + -- How many chunks (immutable strings) to keep in the send buffer + send_buffer_chunks = nil; + + -- Maximum amount of data to send at once (to the TCP buffers), default based on /proc/sys/net/ipv4/tcp_wmem + max_send_chunk = 4*1024*1024; + -- Timeout used during between steps in TLS handshakes ssl_handshake_timeout = 60; @@ -533,26 +541,21 @@ function interface:onwritable() self:onconnect(); if not self.conn then return nil, "no-conn"; end -- could have been closed in onconnect self:on("predrain"); - local buffer = self.writebuffer; - local data = buffer or ""; - if type(buffer) == "table" then - if buffer[3] then - data = t_concat(data); - elseif buffer[2] then - data = buffer[1] .. buffer[2]; - else - data = buffer[1] or ""; - end - end + local buffer = self.writebuffer or ""; + -- Naming things ... s/data/slice/ ? + local data = buffer:sub(1, cfg.max_send_chunk); local ok, err, partial = self.conn:send(data); self._writable = ok; - if ok then + if ok and #data < #buffer then + -- Sent the whole 'data' but there's more in the buffer + ok, err, partial = nil, "timeout", ok; + end + self:debug("Sent %d out of %d buffered bytes", ok and #data or partial or 0, #buffer); + if ok then -- all the data we had was sent successfully self:set(nil, false); if cfg.keep_buffers and type(buffer) == "table" then - for i = #buffer, 1, -1 do - buffer[i] = nil; - end - else + buffer:discard(ok); + else -- string or don't keep buffers self.writebuffer = nil; end self._writing = nil; @@ -560,14 +563,10 @@ function interface:onwritable() self:ondrain(); -- Be aware of writes in ondrain return ok; elseif partial then - self:debug("Sent %d out of %d buffered bytes", partial, #data); - if cfg.keep_buffers and type(buffer) == "table" then - buffer[1] = data:sub(partial+1); - for i = #buffer, 2, -1 do - buffer[i] = nil; - end + if type(buffer) == "table" then + buffer:discard(partial); else - self.writebuffer = data:sub(partial+1); + self.writebuffer = data:sub(partial + 1); end self:set(nil, true); self:setwritetimeout(); @@ -595,13 +594,45 @@ end -- Add data to write buffer and set flag for wanting to write function interface:write(data) local buffer = self.writebuffer; - if type(buffer) == "table" then - t_insert(buffer, data); - elseif type(buffer) == "string" then - self:noise("Allocating buffer!") - self.writebuffer = { buffer, data }; - elseif buffer == nil then + -- (nil) -> save string + -- (string) -> convert to buffer (3 tables!) + -- (buffer) -> write to buffer + if not buffer then self.writebuffer = data; + elseif type(buffer) == "string" then + local prev_buffer = buffer; + buffer = dbuffer.new(cfg.max_send_buffer_size, cfg.send_buffer_chunks); + self.writebuffer = buffer; + if prev_buffer then + -- TODO refactor, there's 3 copies of these lines + if not buffer:write(prev_buffer) then + if self._write_lock then + return false; + end + -- Try to flush buffer to make room + self:onwritable(); + if not buffer:write(prev_buffer) then + return false; + end + end + end + if not buffer:write(data) then + if self._write_lock then + return false; + end + self:onwritable(); + if not buffer:write(data) then + return false; + end + end + elseif not buffer:write(data) then + if self._write_lock then + return false; + end + self:onwritable(); + if not buffer:write(data) then + return false; + end end if not self._write_lock and not self._writing then if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then @@ -619,7 +650,7 @@ interface.send = interface.write; -- Close, possibly after writing is done function interface:close() - if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then + if self.writebuffer and #self.writebuffer ~= 0 then self._connected = false; self:set(false, true); -- Flush final buffer contents self:setreadtimeout(false); @@ -701,7 +732,7 @@ end function interface:starttls(tls_ctx) if tls_ctx then self.tls_ctx = tls_ctx; end self.starttls = false; - if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then + if self.writebuffer and #self.writebuffer ~= 0 then self:debug("Start TLS after write"); self.ondrain = interface.starttls; self:set(nil, true); -- make sure wantwrite is set @@ -935,7 +966,10 @@ function interface:resume_writes() end self:noise("Resume writes"); self._write_lock = nil; - if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then + if self.writebuffer and #self.writebuffer ~= 0 then + if cfg.opportunistic_writes then + return self:onwritable(); + end self:setwritetimeout(); self:set(nil, true); end |