diff options
Diffstat (limited to 'plugins/mod_s2s.lua')
-rw-r--r-- | plugins/mod_s2s.lua | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/plugins/mod_s2s.lua b/plugins/mod_s2s.lua index 04fd5bc3..638ace3d 100644 --- a/plugins/mod_s2s.lua +++ b/plugins/mod_s2s.lua @@ -13,7 +13,6 @@ local hosts = prosody.hosts; local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; -local t_insert = table.insert; local traceback = debug.traceback; local add_task = require "prosody.util.timer".add_task; @@ -33,6 +32,7 @@ local service = require "prosody.net.resolvers.service"; local resolver_chain = require "prosody.net.resolvers.chain"; local errors = require "prosody.util.error"; local set = require "prosody.util.set"; +local queue = require "prosody.util.queue"; local connect_timeout = module:get_option_period("s2s_timeout", 90); local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5); @@ -42,6 +42,7 @@ local secure_domains, insecure_domains = module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; local require_encryption = module:get_option_boolean("s2s_require_encryption", true); local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000); +local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1); local advertised_idle_timeout = 14*60; -- default in all net.server implementations local network_settings = module:get_option("network_settings"); @@ -134,7 +135,7 @@ local bouncy_stanzas = { message = true, presence = true, iq = true }; local function bounce_sendq(session, reason) local sendq = session.sendq; if not sendq then return; end - session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", #sendq, session.to_host); + session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", sendq.count(), session.to_host); local dummy = { type = "s2sin"; send = function () @@ -153,12 +154,12 @@ local function bounce_sendq(session, reason) if session.had_stream then -- set when a stream is opened by the remote error_type, condition = "wait", "remote-server-timeout"; end - if errors.is_err(reason) then + if errors.is_error(reason) then error_type, condition, reason_text = reason.type, reason.condition, reason.text; elseif type(reason) == "string" then reason_text = reason; end - for i, stanza in ipairs(sendq) do + for stanza in sendq:consume() do if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then local reply = st.error_reply( stanza, @@ -170,7 +171,6 @@ local function bounce_sendq(session, reason) else (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag()); end - sendq[i] = nil; end session.sendq = nil; end @@ -194,11 +194,14 @@ function route_to_existing_session(event) (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); -- Queue stanza until we are able to send it - if host.sendq then - t_insert(host.sendq, st.clone(stanza)); - else + if not host.sendq then -- luacheck: ignore 122 - host.sendq = { st.clone(stanza) }; + host.sendq = queue.new(sendq_size); + end + if not host.sendq:push(st.clone(stanza)) then + host.log("warn", "stanza [%s] not queued ", stanza.name); + event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full")); + return true; end host.log("debug", "stanza [%s] queued ", stanza.name); return true; @@ -223,7 +226,8 @@ function route_to_new_session(event) -- Store in buffer host_session.bounce_sendq = bounce_sendq; - host_session.sendq = { st.clone(stanza) }; + host_session.sendq = queue.new(sendq_size); + host_session.sendq:push(st.clone(stanza)); log("debug", "stanza [%s] queued until connection complete", stanza.name); -- FIXME Cleaner solution to passing extra data from resolvers to net.server -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records @@ -362,11 +366,11 @@ function mark_connected(session) if session.direction == "outgoing" then if sendq then - session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); + session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", sendq.count(), session.to_host); local send = session.sends2s; - for i, stanza in ipairs(sendq) do + for stanza in sendq:consume() do + -- TODO check send success send(stanza); - sendq[i] = nil; end session.sendq = nil; end |