aboutsummaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2024-11-09 16:47:14 +0100
committerKim Alvefur <zash@zash.se>2024-11-09 16:47:14 +0100
commit1b68565d701fa28a2b373646a018fd4d8451deb3 (patch)
tree5d5cffdd8d86155aee2900a9f5f154bc8ade815d /plugins
parent52178d74302b310d5ee986399e7f24d1a02da58d (diff)
downloadprosody-1b68565d701fa28a2b373646a018fd4d8451deb3.tar.gz
prosody-1b68565d701fa28a2b373646a018fd4d8451deb3.zip
mod_s2s: Limit size of outgoing stanza queue
This queue is used to buffer stanzas while waiting for an outgoing s2s connection to be established. Limit it to prevent excessive memory usage. Default chosen to approximate how many average stanzas fits in the server_epoll default max_send_buffer_size of 32 MiB Returns a custom error instead of the default core.stanza_router "Communication with remote domains is not enabled" from is sent back, which does not describe what is happening here. Closes #1106
Diffstat (limited to 'plugins')
-rw-r--r--plugins/mod_s2s.lua24
1 files changed, 14 insertions, 10 deletions
diff --git a/plugins/mod_s2s.lua b/plugins/mod_s2s.lua
index 45684f9a..c420eae3 100644
--- a/plugins/mod_s2s.lua
+++ b/plugins/mod_s2s.lua
@@ -13,7 +13,6 @@ local hosts = prosody.hosts;
local core_process_stanza = prosody.core_process_stanza;
local tostring, type = tostring, type;
-local t_insert = table.insert;
local traceback = debug.traceback;
local add_task = require "prosody.util.timer".add_task;
@@ -33,6 +32,7 @@ local service = require "prosody.net.resolvers.service";
local resolver_chain = require "prosody.net.resolvers.chain";
local errors = require "prosody.util.error";
local set = require "prosody.util.set";
+local queue = require "prosody.util.queue";
local connect_timeout = module:get_option_period("s2s_timeout", 90);
local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5);
@@ -42,6 +42,7 @@ local secure_domains, insecure_domains =
module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items;
local require_encryption = module:get_option_boolean("s2s_require_encryption", true);
local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000);
+local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1);
local advertised_idle_timeout = 14*60; -- default in all net.server implementations
local network_settings = module:get_option("network_settings");
@@ -158,7 +159,7 @@ local function bounce_sendq(session, reason)
elseif type(reason) == "string" then
reason_text = reason;
end
- for i, stanza in ipairs(sendq) do
+ for stanza in sendq:consume() do
if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then
local reply = st.error_reply(
stanza,
@@ -170,7 +171,6 @@ local function bounce_sendq(session, reason)
else
(session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag());
end
- sendq[i] = nil;
end
session.sendq = nil;
end
@@ -194,11 +194,14 @@ function route_to_existing_session(event)
(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-- Queue stanza until we are able to send it
- if host.sendq then
- t_insert(host.sendq, st.clone(stanza));
- else
+ if not host.sendq then
-- luacheck: ignore 122
- host.sendq = { st.clone(stanza) };
+ host.sendq = queue.new(sendq_size);
+ end
+ if not host.sendq:push(st.clone(stanza)) then
+ host.log("warn", "stanza [%s] not queued ", stanza.name);
+ event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full"));
+ return true;
end
host.log("debug", "stanza [%s] queued ", stanza.name);
return true;
@@ -223,7 +226,8 @@ function route_to_new_session(event)
-- Store in buffer
host_session.bounce_sendq = bounce_sendq;
- host_session.sendq = { st.clone(stanza) };
+ host_session.sendq = queue.new(sendq_size);
+ host_session.sendq:push(stanza);
log("debug", "stanza [%s] queued until connection complete", stanza.name);
-- FIXME Cleaner solution to passing extra data from resolvers to net.server
-- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records
@@ -364,9 +368,9 @@ function mark_connected(session)
if sendq then
session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
local send = session.sends2s;
- for i, stanza in ipairs(sendq) do
+ for stanza in sendq:consume() do
+ -- TODO check send success
send(stanza);
- sendq[i] = nil;
end
session.sendq = nil;
end