aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2018-10-25 15:12:59 +0200
committerKim Alvefur <zash@zash.se>2018-10-25 15:12:59 +0200
commit3899c7ac4b50242ccfc78edc6d5e3d6c3b954008 (patch)
treef10f829a3a201917528af2117ebe78b410d1fab8
parent5834d45f487f3875987620843914c47a3824feb7 (diff)
downloadprosody-3899c7ac4b50242ccfc78edc6d5e3d6c3b954008.tar.gz
prosody-3899c7ac4b50242ccfc78edc6d5e3d6c3b954008.zip
net.server: Add an API for holding writes of outgoing data
-rw-r--r--net/server_epoll.lua20
-rw-r--r--net/server_event.lua13
-rw-r--r--net/server_select.lua31
3 files changed, 50 insertions, 14 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index 4b40c7d5..cdf3e8fe 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -409,8 +409,10 @@ function interface:write(data)
else
self.writebuffer = { data };
end
- self:setwritetimeout();
- self:set(nil, true);
+ if not self._write_lock then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
return #data;
end
interface.send = interface.write;
@@ -590,6 +592,20 @@ function interface:pausefor(t)
end);
end
+function interface:pause_writes()
+ self._write_lock = true;
+ self:setwritetimeout(false);
+ self:set(nil, false);
+end
+
+function interface:resume_writes()
+ self._write_lock = nil;
+ if self.writebuffer[1] then
+ self:setwritetimeout();
+ self:set(nil, true);
+ end
+end
+
-- Connected!
function interface:onconnect()
if self.conn and not self.peername and self.conn.getpeername then
diff --git a/net/server_event.lua b/net/server_event.lua
index ca80c3f2..70757e03 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -273,6 +273,19 @@ function interface_mt:resume()
end
end
+function interface_mt:pause_writes()
+ return self:_lock(self.nointerface, self.noreading, true);
+end
+
+function interface_mt:resume_writes()
+ self:_lock(self.nointerface, self.noreading, false);
+ if self.writecallback and not self.eventwrite then
+ self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ); -- register callback
+ return true;
+ end
+end
+
+
function interface_mt:counter(c)
if c then
self._connections = self._connections + c
diff --git a/net/server_select.lua b/net/server_select.lua
index 745e1f49..693cee5e 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -485,20 +485,27 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
out_error( "server.lua, lock() is deprecated" )
handler.lock_read (self, switch)
if switch == true then
- local tmp = _sendlistlen
- _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
- _writetimes[ handler ] = nil
- if _sendlistlen ~= tmp then
- nosend = true
- end
+ handler.pause_writes (self)
elseif switch == false then
- if nosend then
- nosend = false
- write( "" )
- end
+ handler.resume_writes (self)
end
return noread, nosend
end
+ handler.pause_writes = function (self)
+ local tmp = _sendlistlen
+ _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+ _writetimes[ handler ] = nil
+ if _sendlistlen ~= tmp then
+ nosend = true
+ end
+ end
+ handler.resume_writes = function (self)
+ if nosend then
+ nosend = false
+ write( "" )
+ end
+ end
+
local _readbuffer = function( ) -- this function reads data
local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern"
if not err or (err == "wantread" or err == "timeout") then -- received something
@@ -716,7 +723,7 @@ local function link(sender, receiver, buffersize)
function receiver.sendbuffer()
_sendbuffer();
if sender_locked and receiver.bufferlen() < buffersize then
- sender:resume(); -- Unlock now
+ sender:lock_read(false); -- Unlock now
sender_locked = nil;
end
end
@@ -726,7 +733,7 @@ local function link(sender, receiver, buffersize)
_readbuffer();
if not sender_locked and receiver.bufferlen() >= buffersize then
sender_locked = true;
- sender:pause();
+ sender:lock_read(true);
end
end
sender:set_mode("*a");