From c51d0beea7af41fd615cb11640a2a6bb0fcf1aab Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:21:17 +0100 Subject: net.server_select, net.server_event: Support for :pause() and :resume() on connections --- net/server_event.lua | 15 ++++++++++++++- net/server_select.lua | 7 +++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/net/server_event.lua b/net/server_event.lua index dde13e61..c28c5ce2 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -282,8 +282,21 @@ do return nointerface, noreading, nowriting end + --TODO: Deprecate function interface_mt:lock_read(switch) - return self:_lock(self.nointerface, switch, self.nowriting); + if switch then + return self:pause(); + else + return self:resume(); + end + end + + function interface_mt:pause() + return self:_lock(self.nointerface, true, self.nowriting); + end + + function interface_mt:resume() + return self:_lock(self.nointerface, false, self.nowriting); end function interface_mt:counter(c) diff --git a/net/server_select.lua b/net/server_select.lua index 48262ccc..6d6f7fbc 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -392,6 +392,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport maxreadlen = readlen or maxreadlen return bufferlen, maxreadlen, maxsendlen end + --TODO: Deprecate handler.lock_read = function (self, switch) if switch == true then local tmp = _readlistlen @@ -409,6 +410,12 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport end return noread end + handler.pause = function (self) + return self:lock_read(true); + end + handler.resume = function (self) + return self:lock_read(false); + end handler.lock = function( self, switch ) handler.lock_read (switch) if switch == true then -- cgit v1.2.3 From 242d67b43e37e5fa4a831011f70a2429957f7b6b Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:23:17 +0100 Subject: net.server_select, net.server_event: Rename :pattern() method to :set_mode() for select backend, and add :set_mode() to event backend --- net/server_event.lua | 9 ++++++++- net/server_select.lua | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/net/server_event.lua b/net/server_event.lua index c28c5ce2..bbc9a527 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -402,6 +402,13 @@ do self.starttls = false; -- prevent starttls() end end + + function interface_mt:set_mode(pattern) + if pattern then + self._pattern = pattern; + end + return self._pattern; + end function interface_mt:set_send(new_send) -- No-op, we always use the underlying connection's send @@ -599,7 +606,7 @@ do interface.eventreadtimeout = nil end end - local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern" + local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) buffer = buffer or part or "" local len = string_len( buffer ) diff --git a/net/server_select.lua b/net/server_select.lua index 6d6f7fbc..475e8aca 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -379,7 +379,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.socket = function( self ) return socket end - handler.pattern = function( self, new ) + handler.set_mode = function( self, new ) pattern = new or pattern return pattern end -- cgit v1.2.3 From 3c7eb58b08e7f416d73f86061706674d37f83c4f Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:24:24 +0100 Subject: net.server_select, net.server_event: Support for ondrain listener callback for when send buffer is empty --- net/server_event.lua | 3 +++ net/server_select.lua | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/net/server_event.lua b/net/server_event.lua index bbc9a527..8eaa96da 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -465,6 +465,8 @@ do end function interface_mt:ontimeout() end + function interface_mt:ondrain() + end function interface_mt:onstatus() debug("server.lua: Dummy onstatus()") end @@ -545,6 +547,7 @@ do if succ then -- writing succesful interface.writebuffer = "" interface.writebufferlen = 0 + interface:ondrain(); if interface.fatalerror then debug "closing client after writing" interface:_close() -- close interface if needed diff --git a/net/server_select.lua b/net/server_select.lua index 475e8aca..9bc112bb 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -252,6 +252,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local dispatch = listeners.onincoming local status = listeners.onstatus local disconnect = listeners.ondisconnect + local drain = listeners.ondrain local bufferqueue = { } -- buffer array local bufferqueuelen = 0 -- end of buffer array @@ -284,6 +285,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport dispatch = listeners.onincoming disconnect = listeners.ondisconnect status = listeners.onstatus + drain = listeners.ondrain end handler.getstats = function( ) return readtraffic, sendtraffic @@ -437,7 +439,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport end local _readbuffer = function( ) -- this function reads data local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" - if not err or (err == "wantread" or err == "timeout") or (part and string_len(part) > 0) then -- received something + if not err or (err == "wantread" or err == "timeout") then -- received something local buffer = buffer or part or "" local len = string_len( buffer ) if len > maxreadlen then @@ -479,6 +481,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist _ = needtls and handler:starttls(nil, true) _writetimes[ handler ] = nil + if drain then + drain(handler) + end _ = toclose and handler:close( ) return true elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write -- cgit v1.2.3 From a22e2228658189c68d1125f1fb6623088f8bc3cb Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:25:48 +0100 Subject: net.server_select, net.server_event: Add server.link() to link 2 connections with an intermediate buffer of the specified size --- net/server_event.lua | 21 +++++++++++++++++++++ net/server_select.lua | 23 +++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/net/server_event.lua b/net/server_event.lua index 8eaa96da..43e70a0f 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -845,11 +845,32 @@ function hook_signal(signal_num, handler) return signal_events[signal_num]; end +local function link(sender, receiver, buffersize) + sender:set_mode(buffersize); + local sender_locked; + + function receiver:ondrain() + if sender_locked then + sender:resume(); + sender_locked = nil; + end + end + + function sender:onincoming(data) + receiver:write(data); + if receiver.writebufferlen >= buffersize then + sender_locked = true; + sender:pause(); + end + end +end + return { cfg = cfg, base = base, loop = loop, + link = link, event = event, event_base = base, addevent = newevent, diff --git a/net/server_select.lua b/net/server_select.lua index 9bc112bb..68ac7091 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -675,6 +675,28 @@ closesocket = function( socket ) --mem_free( ) end +local function link(sender, receiver, buffersize) + sender:set_mode(buffersize); + local sender_locked; + local _sendbuffer = receiver.sendbuffer; + function receiver.sendbuffer() + _sendbuffer(); + if sender_locked and receiver.bufferlen() < buffersize then + sender:lock_read(false); -- Unlock now + sender_locked = nil; + end + end + + local _readbuffer = sender.readbuffer; + function sender.readbuffer() + _readbuffer(); + if not sender_locked and receiver.bufferlen() >= buffersize then + sender_locked = true; + sender:lock_read(true); + end + end +end + ----------------------------------// PUBLIC //-- addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server @@ -898,6 +920,7 @@ return { wrapclient = wrapclient, loop = loop, + link = link, stats = stats, closeall = closeall, addtimer = addtimer, -- cgit v1.2.3 From f0ff564c3670137f4973abef7efe2d848e0d8a4c Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:31:18 +0100 Subject: mod_proxy65: Use new server.link to link proxied connections, now works with either connection backend --- plugins/mod_proxy65.lua | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/plugins/mod_proxy65.lua b/plugins/mod_proxy65.lua index 5c9ae329..ed279ed3 100644 --- a/plugins/mod_proxy65.lua +++ b/plugins/mod_proxy65.lua @@ -20,6 +20,7 @@ local componentmanager = require "core.componentmanager"; local config_get = require "core.configmanager".get; local connlisteners = require "net.connlisteners"; local sha1 = require "util.hashes".sha1; +local server = require "net.server"; local host, name = module:get_host(), "SOCKS5 Bytestreams Service"; local sessions, transfers, component, replies_cache = {}, {}, nil, {}; @@ -28,6 +29,7 @@ local proxy_port = config_get(host, "core", "proxy65_port") or 5000; local proxy_interface = config_get(host, "core", "proxy65_interface") or "*"; local proxy_address = config_get(host, "core", "proxy65_address") or (proxy_interface ~= "*" and proxy_interface) or host; local proxy_acl = config_get(host, "core", "proxy65_acl"); +local max_buffer_size = 4096; local connlistener = { default_port = proxy_port, default_interface = proxy_interface, default_mode = "*a" }; @@ -84,8 +86,8 @@ function connlistener.onincoming(conn, data) transfers[sha].initiator = conn; session.sha = sha; module:log("debug", "initiator connected ... "); - throttle_sending(conn, transfers[sha].target); - throttle_sending(transfers[sha].target, conn); + server.link(conn, transfers[sha].target, max_buffer_size); + server.link(transfers[sha].target, conn, max_buffer_size); end conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte) conn:lock_read(true) @@ -262,25 +264,3 @@ end connlisteners.start(module.host .. ':proxy65'); component = componentmanager.register_component(host, handle_to_domain); -local sender_lock_threshold = 4096; -function throttle_sending(sender, receiver) - sender:pattern(sender_lock_threshold); - local sender_locked; - local _sendbuffer = receiver.sendbuffer; - function receiver.sendbuffer() - _sendbuffer(); - if sender_locked and receiver.bufferlen() < sender_lock_threshold then - sender:lock_read(false); -- Unlock now - sender_locked = nil; - end - end - - local _readbuffer = sender.readbuffer; - function sender.readbuffer() - _readbuffer(); - if not sender_locked and receiver.bufferlen() >= sender_lock_threshold then - sender_locked = true; - sender:lock_read(true); - end - end -end -- cgit v1.2.3 From 5492f0aebdf5cb5657d380d60207a5d2bcae506d Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:32:16 +0100 Subject: mod_proxy65: Return service-unavailable error when receiving an iq we didn't understand --- plugins/mod_proxy65.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/mod_proxy65.lua b/plugins/mod_proxy65.lua index ed279ed3..3a543d8f 100644 --- a/plugins/mod_proxy65.lua +++ b/plugins/mod_proxy65.lua @@ -236,6 +236,9 @@ function handle_to_domain(origin, stanza) elseif xmlns == "http://jabber.org/protocol/bytestreams" then origin.send(get_stream_host(origin, stanza)); return true; + else + origin.send(st.error_reply(stanza, "cancel", "service-unavailable")); + return true; end elseif stanza.name == "iq" and type == "set" then local reply, from, to, sid = set_activation(stanza); -- cgit v1.2.3 From ec60bf9e1b50e97162097027e852ce5d0db4bdf8 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Wed, 5 May 2010 15:33:29 +0100 Subject: mod_proxy65: Send error reply to activation stanza if one or both parties were not connected to the proxy --- plugins/mod_proxy65.lua | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/plugins/mod_proxy65.lua b/plugins/mod_proxy65.lua index 3a543d8f..190d30be 100644 --- a/plugins/mod_proxy65.lua +++ b/plugins/mod_proxy65.lua @@ -241,6 +241,7 @@ function handle_to_domain(origin, stanza) return true; end elseif stanza.name == "iq" and type == "set" then + module:log("debug", "Received activation request from %s", stanza.attr.from); local reply, from, to, sid = set_activation(stanza); if reply ~= nil and from ~= nil and to ~= nil and sid ~= nil then local sha = sha1(sid .. from .. to, true); @@ -251,6 +252,15 @@ function handle_to_domain(origin, stanza) transfers[sha].activated = true; transfers[sha].target:lock_read(false); transfers[sha].initiator:lock_read(false); + else + module:log("debug", "Both parties were not yet connected"); + local message = "Neither party is connected to the proxy"; + if transfers[sha].initiator then + message = "The recipient is not connected to the proxy"; + elseif transfers[sha].target then + message = "The sender (you) is not connected to the proxy"; + end + origin.send(st.error_reply(stanza, "cancel", "not-allowed", message)); end else module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to)); -- cgit v1.2.3