aboutsummaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2021-12-14 20:00:45 +0100
committerKim Alvefur <zash@zash.se>2021-12-14 20:00:45 +0100
commit6e0bbc2af5f1213df15e988ea176c7f398c9fdf3 (patch)
tree4ab2c6874868cf59650074de35b3ccbd87aa0c57 /plugins
parent93a9ed262065d9966643ad5ae50b58142a1c6823 (diff)
downloadprosody-6e0bbc2af5f1213df15e988ea176c7f398c9fdf3.tar.gz
prosody-6e0bbc2af5f1213df15e988ea176c7f398c9fdf3.zip
mod_smacks: Limit queue memory consumption using new util
This brings back the queue size limit that was once added, then removed because destroying the session when reaching the limit was not great. Instead, the queue wraps and overwrites the oldest unacked stanza on the assumption that it will probably be acked anyway and thus does not need to be delivered. If those discarded stanzas turn out to be needed on resumption then the resumption fails.
Diffstat (limited to 'plugins')
-rw-r--r--plugins/mod_admin_shell.lua2
-rw-r--r--plugins/mod_smacks.lua123
2 files changed, 73 insertions, 52 deletions
diff --git a/plugins/mod_admin_shell.lua b/plugins/mod_admin_shell.lua
index 4ee39289..71597ee9 100644
--- a/plugins/mod_admin_shell.lua
+++ b/plugins/mod_admin_shell.lua
@@ -815,7 +815,7 @@ local available_columns = {
width = 8;
align = "right";
mapper = function (queue)
- return queue and tostring(#queue);
+ return queue and tostring(queue:count_unacked());
end
};
csi = {
diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua
index 57fd6f6d..960febac 100644
--- a/plugins/mod_smacks.lua
+++ b/plugins/mod_smacks.lua
@@ -13,13 +13,12 @@
local tonumber = tonumber;
local tostring = tostring;
-local math_min = math.min;
local os_time = os.time;
-local t_remove = table.remove;
local datetime = require "util.datetime";
local add_filter = require "util.filters".add_filter;
local jid = require "util.jid";
+local smqueue = require "util.smqueue";
local st = require "util.stanza";
local timer = require "util.timer";
local uuid_generate = require "util.uuid".generate;
@@ -37,6 +36,7 @@ local xmlns_sm3 = "urn:xmpp:sm:3";
local sm2_attr = { xmlns = xmlns_sm2 };
local sm3_attr = { xmlns = xmlns_sm3 };
+local queue_size = module:get_option_number("smacks_max_queue_size", 500);
local resume_timeout = module:get_option_number("smacks_hibernation_time", 600);
local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true);
local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
@@ -51,13 +51,22 @@ local function format_h(h) if h then return string.format("%d", h) end end
local old_session_registry = module:open_store("smacks_h", "map");
local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource
+local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, {
+ head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" };
+ tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" };
+ pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" };
+ overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" }
+});
+
+-- COMPAT note the use of compatibilty wrapper in events (queue:table())
+
local function ack_delayed(session, stanza)
-- fire event only if configured to do so and our session is not already hibernated or destroyed
if delayed_ack_timeout > 0 and session.awaiting_ack
and not session.hibernating and not session.destroyed then
session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
- session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
- module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza});
+ session.outgoing_stanza_queue and session.outgoing_stanza_queue:count_unacked() or 0);
+ module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue:table(), stanza = stanza});
end
session.delayed_ack_timer = nil;
end
@@ -101,7 +110,7 @@ local function should_ack(session, force)
if session.awaiting_ack then return end -- already waiting
if force then return force end
local queue = session.outgoing_stanza_queue;
- local expected_h = session.last_acknowledged_stanza + #queue;
+ local expected_h = session.last_acknowledged_stanza + queue:count_unacked();
local max_unacked = max_unacked_stanzas;
if session.state == "inactive" then
max_unacked = max_inactive_unacked_stanzas;
@@ -109,18 +118,18 @@ local function should_ack(session, force)
-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
-- further requests until a higher h-value would be expected.
- return #queue > max_unacked and expected_h ~= session.last_requested_h;
+ return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h;
end
local function request_ack(session, reason)
local queue = session.outgoing_stanza_queue;
- session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
+ session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
if session.destroyed then return end -- sending something can trigger destruction
session.awaiting_ack = true;
-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
- session.last_requested_h = session.last_acknowledged_stanza + #queue;
- session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
+ session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked();
+ session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
if not session.delayed_ack_timer then
session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
@@ -150,7 +159,7 @@ local function request_ack_if_needed(session, force, reason, stanza)
if session.state == "inactive" then
max_unacked = max_inactive_unacked_stanzas;
end
- if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
+ if queue:count_unacked() > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
end
@@ -178,10 +187,12 @@ local function outgoing_stanza_filter(stanza, session)
});
end
- queue[#queue+1] = cached_stanza;
+ queue:push(cached_stanza);
+
if session.hibernating then
session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
- module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza});
+ -- FIXME queue implementation changed, anything depending on it being an array will break
+ module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza});
return nil;
end
end
@@ -198,7 +209,7 @@ end
local function wrap_session_out(session, resume)
if not resume then
- session.outgoing_stanza_queue = {};
+ session.outgoing_stanza_queue = smqueue.new(queue_size);
session.last_acknowledged_stanza = 0;
end
@@ -324,31 +335,26 @@ function handle_a(origin, stanza)
origin.delayed_ack_timer = nil;
end
-- Remove handled stanzas from outgoing_stanza_queue
- -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
local h = tonumber(stanza.attr.h);
if not h then
origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
return;
end
- local handled_stanza_count = h-origin.last_acknowledged_stanza;
local queue = origin.outgoing_stanza_queue;
- if handled_stanza_count > #queue then
- origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
- handled_stanza_count, #queue);
- origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza);
- for i=1,#queue do
- origin.log("debug", "Q item %d: %s", i, tostring(queue[i]));
+ local handled_stanza_count = h-queue:count_acked();
+ local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
+ if err then
+ origin.log("warn", "The client says it handled %d new stanzas, but we sent %d :)",
+ handled_stanza_count, queue:count_unacked());
+ origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), queue:count_acked());
+ for i, item in queue._queue:items() do
+ origin.log("debug", "Q item %d: %s", i, item);
end
- origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; };
+ origin:close(err);
return;
end
- for _=1,math_min(handled_stanza_count,#queue) do
- t_remove(origin.outgoing_stanza_queue, 1);
- end
-
- origin.log("debug", "#queue = %d", #queue);
- origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
+ origin.log("debug", "#queue = %d", queue:count_unacked());
request_ack_now_if_needed(origin, false, "handle_a", nil)
return true;
end
@@ -357,12 +363,13 @@ module:hook_tag(xmlns_sm3, "a", handle_a);
local function handle_unacked_stanzas(session)
local queue = session.outgoing_stanza_queue;
- if #queue > 0 then
- session.outgoing_stanza_queue = {};
- for i=1,#queue do
- if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then
- if queue[i].attr.type ~= "error" and queue[i].attr.from ~= session.full_jid then
- local reply = st.error_reply(queue[i], "cancel", "recipient-unavailable");
+ if queue:count_unacked() > 0 then
+ session.smacks = false; -- Disable queueing
+ session.outgoing_stanza_queue = nil;
+ for stanza in queue._queue:consume() do
+ if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then
+ if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then
+ local reply = st.error_reply(stanza, "cancel", "recipient-unavailable");
core_process_stanza(session, reply);
end
end
@@ -416,8 +423,8 @@ module:hook("pre-resource-unbind", function (event)
if not session.smacks then return end
if not session.resumption_token then
local queue = session.outgoing_stanza_queue;
- if #queue > 0 then
- session.log("debug", "Destroying session with %d unacked stanzas", #queue);
+ if queue:count_unacked() > 0 then
+ session.log("debug", "Destroying session with %d unacked stanzas", queue:count_unacked());
handle_unacked_stanzas(session);
end
return
@@ -440,18 +447,18 @@ module:hook("pre-resource-unbind", function (event)
if session.conn then
session.conn:close();
end
- module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue });
+ module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue:table() });
return true; -- Postpone destruction for now
end);
local function handle_s2s_destroyed(event)
local session = event.session;
local queue = session.outgoing_stanza_queue;
- if queue and #queue > 0 then
- session.log("warn", "Destroying session with %d unacked stanzas", #queue);
+ if queue and queue:count_unacked() > 0 then
+ session.log("warn", "Destroying session with %d unacked stanzas", queue:count_unacked());
if s2s_resend then
- for i = 1, #queue do
- module:send(queue[i]);
+ for stanza in queue:consume() do
+ module:send(stanza);
end
session.outgoing_stanza_queue = nil;
else
@@ -505,6 +512,7 @@ function handle_resume(session, stanza, xmlns_sm)
c2s_sessions[conn] = nil;
conn:close();
end
+
local migrated_session_log = session.log;
original_session.ip = session.ip;
original_session.conn = session.conn;
@@ -530,33 +538,46 @@ function handle_resume(session, stanza, xmlns_sm)
-- Similar for connlisteners
c2s_sessions[session.conn] = original_session;
+ local queue = original_session.outgoing_stanza_queue;
+ local h = tonumber(stanza.attr.h);
+
+ original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
+ local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
+
+ if not err and not queue:resumable() then
+ err = ack_errors.new("overflow");
+ end
+
+ if err or not queue:resumable() then
+ original_session.send(st.stanza("failed",
+ { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id }));
+ original_session:close(err);
+ return false;
+ end
+
original_session.send(st.stanza("resumed", { xmlns = xmlns_sm,
h = format_h(original_session.handled_stanza_count), previd = id }));
- -- Fake an <a> with the h of the <resume/> from the client
- original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
- h = stanza.attr.h }));
-
-- Ok, we need to re-send any stanzas that the client didn't see
-- ...they are what is now left in the outgoing stanza queue
-- We have to use the send of "session" because we don't want to add our resent stanzas
-- to the outgoing queue again
- local queue = original_session.outgoing_stanza_queue;
- session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue);
+
+ session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
-- FIXME Which session is it that the queue filter sees?
session.resending_unacked = true;
original_session.resending_unacked = true;
- for i=1,#queue do
- session.send(queue[i]);
+ for _, queued_stanza in queue:resume() do
+ session.send(queued_stanza);
end
session.resending_unacked = nil;
original_session.resending_unacked = nil;
- session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue);
+ session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked());
function session.send(stanza) -- luacheck: ignore 432
migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
return false;
end
- module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
+ module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
request_ack_if_needed(original_session, true, "handle_resume", nil);
end
return true;