aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2010-05-05 15:33:58 +0100
committerMatthew Wild <mwild1@gmail.com>2010-05-05 15:33:58 +0100
commit1dce4a3033300f40a4b74b286162bab34337f9bb (patch)
tree1c819b3d1b29b20b4b275bdb6c884a1c1bd4af83 /net
parent697b71895ecc7968c2cb56058abded5017ecb090 (diff)
parentec60bf9e1b50e97162097027e852ce5d0db4bdf8 (diff)
downloadprosody-1dce4a3033300f40a4b74b286162bab34337f9bb.tar.gz
prosody-1dce4a3033300f40a4b74b286162bab34337f9bb.zip
Merge 0.7->trunk
Diffstat (limited to 'net')
-rw-r--r--net/server_event.lua48
-rw-r--r--net/server_select.lua39
2 files changed, 83 insertions, 4 deletions
diff --git a/net/server_event.lua b/net/server_event.lua
index dde13e61..43e70a0f 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -282,8 +282,21 @@ do
return nointerface, noreading, nowriting
end
+ --TODO: Deprecate
function interface_mt:lock_read(switch)
- return self:_lock(self.nointerface, switch, self.nowriting);
+ if switch then
+ return self:pause();
+ else
+ return self:resume();
+ end
+ end
+
+ function interface_mt:pause()
+ return self:_lock(self.nointerface, true, self.nowriting);
+ end
+
+ function interface_mt:resume()
+ return self:_lock(self.nointerface, false, self.nowriting);
end
function interface_mt:counter(c)
@@ -389,6 +402,13 @@ do
self.starttls = false; -- prevent starttls()
end
end
+
+ function interface_mt:set_mode(pattern)
+ if pattern then
+ self._pattern = pattern;
+ end
+ return self._pattern;
+ end
function interface_mt:set_send(new_send)
-- No-op, we always use the underlying connection's send
@@ -445,6 +465,8 @@ do
end
function interface_mt:ontimeout()
end
+ function interface_mt:ondrain()
+ end
function interface_mt:onstatus()
debug("server.lua: Dummy onstatus()")
end
@@ -525,6 +547,7 @@ do
if succ then -- writing succesful
interface.writebuffer = ""
interface.writebufferlen = 0
+ interface:ondrain();
if interface.fatalerror then
debug "closing client after writing"
interface:_close() -- close interface if needed
@@ -586,7 +609,7 @@ do
interface.eventreadtimeout = nil
end
end
- local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern"
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
--vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
buffer = buffer or part or ""
local len = string_len( buffer )
@@ -822,11 +845,32 @@ function hook_signal(signal_num, handler)
return signal_events[signal_num];
end
+local function link(sender, receiver, buffersize)
+ sender:set_mode(buffersize);
+ local sender_locked;
+
+ function receiver:ondrain()
+ if sender_locked then
+ sender:resume();
+ sender_locked = nil;
+ end
+ end
+
+ function sender:onincoming(data)
+ receiver:write(data);
+ if receiver.writebufferlen >= buffersize then
+ sender_locked = true;
+ sender:pause();
+ end
+ end
+end
+
return {
cfg = cfg,
base = base,
loop = loop,
+ link = link,
event = event,
event_base = base,
addevent = newevent,
diff --git a/net/server_select.lua b/net/server_select.lua
index 94461f45..8a4d917d 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -252,6 +252,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
local dispatch = listeners.onincoming
local status = listeners.onstatus
local disconnect = listeners.ondisconnect
+ local drain = listeners.ondrain
local bufferqueue = { } -- buffer array
local bufferqueuelen = 0 -- end of buffer array
@@ -284,6 +285,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
dispatch = listeners.onincoming
disconnect = listeners.ondisconnect
status = listeners.onstatus
+ drain = listeners.ondrain
end
handler.getstats = function( )
return readtraffic, sendtraffic
@@ -379,7 +381,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
handler.socket = function( self )
return socket
end
- handler.pattern = function( self, new )
+ handler.set_mode = function( self, new )
pattern = new or pattern
return pattern
end
@@ -392,6 +394,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
maxreadlen = readlen or maxreadlen
return bufferlen, maxreadlen, maxsendlen
end
+ --TODO: Deprecate
handler.lock_read = function (self, switch)
if switch == true then
local tmp = _readlistlen
@@ -409,6 +412,12 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
end
return noread
end
+ handler.pause = function (self)
+ return self:lock_read(true);
+ end
+ handler.resume = function (self)
+ return self:lock_read(false);
+ end
handler.lock = function( self, switch )
handler.lock_read (switch)
if switch == true then
@@ -430,7 +439,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
end
local _readbuffer = function( ) -- this function reads data
local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern"
- if not err or (err == "wantread" or err == "timeout") or (part and string_len(part) > 0) then -- received something
+ if not err or (err == "wantread" or err == "timeout") then -- received something
local buffer = buffer or part or ""
local len = string_len( buffer )
if len > maxreadlen then
@@ -472,6 +481,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
_sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist
_ = needtls and handler:starttls(nil, true)
_writetimes[ handler ] = nil
+ if drain then
+ drain(handler)
+ end
_ = toclose and handler:close( )
return true
elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write
@@ -666,6 +678,28 @@ closesocket = function( socket )
--mem_free( )
end
+local function link(sender, receiver, buffersize)
+ sender:set_mode(buffersize);
+ local sender_locked;
+ local _sendbuffer = receiver.sendbuffer;
+ function receiver.sendbuffer()
+ _sendbuffer();
+ if sender_locked and receiver.bufferlen() < buffersize then
+ sender:lock_read(false); -- Unlock now
+ sender_locked = nil;
+ end
+ end
+
+ local _readbuffer = sender.readbuffer;
+ function sender.readbuffer()
+ _readbuffer();
+ if not sender_locked and receiver.bufferlen() >= buffersize then
+ sender_locked = true;
+ sender:lock_read(true);
+ end
+ end
+end
+
----------------------------------// PUBLIC //--
addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server
@@ -889,6 +923,7 @@ return {
wrapclient = wrapclient,
loop = loop,
+ link = link,
stats = stats,
closeall = closeall,
addtimer = addtimer,