From 1b68565d701fa28a2b373646a018fd4d8451deb3 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sat, 9 Nov 2024 16:47:14 +0100 Subject: mod_s2s: Limit size of outgoing stanza queue This queue is used to buffer stanzas while waiting for an outgoing s2s connection to be established. Limit it to prevent excessive memory usage. Default chosen to approximate how many average stanzas fits in the server_epoll default max_send_buffer_size of 32 MiB Returns a custom error instead of the default core.stanza_router "Communication with remote domains is not enabled" from is sent back, which does not describe what is happening here. Closes #1106 --- plugins/mod_s2s.lua | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/plugins/mod_s2s.lua b/plugins/mod_s2s.lua index 45684f9a..c420eae3 100644 --- a/plugins/mod_s2s.lua +++ b/plugins/mod_s2s.lua @@ -13,7 +13,6 @@ local hosts = prosody.hosts; local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; -local t_insert = table.insert; local traceback = debug.traceback; local add_task = require "prosody.util.timer".add_task; @@ -33,6 +32,7 @@ local service = require "prosody.net.resolvers.service"; local resolver_chain = require "prosody.net.resolvers.chain"; local errors = require "prosody.util.error"; local set = require "prosody.util.set"; +local queue = require "prosody.util.queue"; local connect_timeout = module:get_option_period("s2s_timeout", 90); local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5); @@ -42,6 +42,7 @@ local secure_domains, insecure_domains = module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; local require_encryption = module:get_option_boolean("s2s_require_encryption", true); local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000); +local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1); local advertised_idle_timeout = 14*60; -- default in all net.server implementations local network_settings = module:get_option("network_settings"); @@ -158,7 +159,7 @@ local function bounce_sendq(session, reason) elseif type(reason) == "string" then reason_text = reason; end - for i, stanza in ipairs(sendq) do + for stanza in sendq:consume() do if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then local reply = st.error_reply( stanza, @@ -170,7 +171,6 @@ local function bounce_sendq(session, reason) else (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag()); end - sendq[i] = nil; end session.sendq = nil; end @@ -194,11 +194,14 @@ function route_to_existing_session(event) (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); -- Queue stanza until we are able to send it - if host.sendq then - t_insert(host.sendq, st.clone(stanza)); - else + if not host.sendq then -- luacheck: ignore 122 - host.sendq = { st.clone(stanza) }; + host.sendq = queue.new(sendq_size); + end + if not host.sendq:push(st.clone(stanza)) then + host.log("warn", "stanza [%s] not queued ", stanza.name); + event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full")); + return true; end host.log("debug", "stanza [%s] queued ", stanza.name); return true; @@ -223,7 +226,8 @@ function route_to_new_session(event) -- Store in buffer host_session.bounce_sendq = bounce_sendq; - host_session.sendq = { st.clone(stanza) }; + host_session.sendq = queue.new(sendq_size); + host_session.sendq:push(stanza); log("debug", "stanza [%s] queued until connection complete", stanza.name); -- FIXME Cleaner solution to passing extra data from resolvers to net.server -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records @@ -364,9 +368,9 @@ function mark_connected(session) if sendq then session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); local send = session.sends2s; - for i, stanza in ipairs(sendq) do + for stanza in sendq:consume() do + -- TODO check send success send(stanza); - sendq[i] = nil; end session.sendq = nil; end -- cgit v1.2.3