aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_s2s.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_s2s.lua')
-rw-r--r--plugins/mod_s2s.lua30
1 files changed, 17 insertions, 13 deletions
diff --git a/plugins/mod_s2s.lua b/plugins/mod_s2s.lua
index 04fd5bc3..638ace3d 100644
--- a/plugins/mod_s2s.lua
+++ b/plugins/mod_s2s.lua
@@ -13,7 +13,6 @@ local hosts = prosody.hosts;
local core_process_stanza = prosody.core_process_stanza;
local tostring, type = tostring, type;
-local t_insert = table.insert;
local traceback = debug.traceback;
local add_task = require "prosody.util.timer".add_task;
@@ -33,6 +32,7 @@ local service = require "prosody.net.resolvers.service";
local resolver_chain = require "prosody.net.resolvers.chain";
local errors = require "prosody.util.error";
local set = require "prosody.util.set";
+local queue = require "prosody.util.queue";
local connect_timeout = module:get_option_period("s2s_timeout", 90);
local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5);
@@ -42,6 +42,7 @@ local secure_domains, insecure_domains =
module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items;
local require_encryption = module:get_option_boolean("s2s_require_encryption", true);
local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000);
+local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1);
local advertised_idle_timeout = 14*60; -- default in all net.server implementations
local network_settings = module:get_option("network_settings");
@@ -134,7 +135,7 @@ local bouncy_stanzas = { message = true, presence = true, iq = true };
local function bounce_sendq(session, reason)
local sendq = session.sendq;
if not sendq then return; end
- session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", #sendq, session.to_host);
+ session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", sendq.count(), session.to_host);
local dummy = {
type = "s2sin";
send = function ()
@@ -153,12 +154,12 @@ local function bounce_sendq(session, reason)
if session.had_stream then -- set when a stream is opened by the remote
error_type, condition = "wait", "remote-server-timeout";
end
- if errors.is_err(reason) then
+ if errors.is_error(reason) then
error_type, condition, reason_text = reason.type, reason.condition, reason.text;
elseif type(reason) == "string" then
reason_text = reason;
end
- for i, stanza in ipairs(sendq) do
+ for stanza in sendq:consume() do
if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then
local reply = st.error_reply(
stanza,
@@ -170,7 +171,6 @@ local function bounce_sendq(session, reason)
else
(session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag());
end
- sendq[i] = nil;
end
session.sendq = nil;
end
@@ -194,11 +194,14 @@ function route_to_existing_session(event)
(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-- Queue stanza until we are able to send it
- if host.sendq then
- t_insert(host.sendq, st.clone(stanza));
- else
+ if not host.sendq then
-- luacheck: ignore 122
- host.sendq = { st.clone(stanza) };
+ host.sendq = queue.new(sendq_size);
+ end
+ if not host.sendq:push(st.clone(stanza)) then
+ host.log("warn", "stanza [%s] not queued ", stanza.name);
+ event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full"));
+ return true;
end
host.log("debug", "stanza [%s] queued ", stanza.name);
return true;
@@ -223,7 +226,8 @@ function route_to_new_session(event)
-- Store in buffer
host_session.bounce_sendq = bounce_sendq;
- host_session.sendq = { st.clone(stanza) };
+ host_session.sendq = queue.new(sendq_size);
+ host_session.sendq:push(st.clone(stanza));
log("debug", "stanza [%s] queued until connection complete", stanza.name);
-- FIXME Cleaner solution to passing extra data from resolvers to net.server
-- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records
@@ -362,11 +366,11 @@ function mark_connected(session)
if session.direction == "outgoing" then
if sendq then
- session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
+ session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", sendq.count(), session.to_host);
local send = session.sends2s;
- for i, stanza in ipairs(sendq) do
+ for stanza in sendq:consume() do
+ -- TODO check send success
send(stanza);
- sendq[i] = nil;
end
session.sendq = nil;
end