From 6926340d75370f106ad8694041d9b3f4e0078bdb Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Fri, 26 Aug 2022 19:07:36 +0100 Subject: mod_smacks: Split resumption into multiple stages, to simplify ISR integration This will allow us to return the success/failed as part of the SASL2 response, and *then* perform the stanza sync as a second step. --- plugins/mod_smacks.lua | 145 ++++++++++++++++++++++++++++--------------------- 1 file changed, 82 insertions(+), 63 deletions(-) (limited to 'plugins/mod_smacks.lua') diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua index 42a903f6..7aa8687a 100644 --- a/plugins/mod_smacks.lua +++ b/plugins/mod_smacks.lua @@ -107,6 +107,12 @@ local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, { overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } }); +local resume_errors = require "util.error".init("mod_smacks", xmlns_sm3, { + expired = { condition = "item-not-found", text = "Session expired, and cannot be resumed" }; + already_bound = { condition = "unexpected-request", text = "Cannot resume another session after a resource is bound" }; + unknown_session = { condition = "item-not-found", text = "Unknown session" }; +}); + -- COMPAT note the use of compatibility wrapper in events (queue:table()) local function ack_delayed(session, stanza) @@ -527,13 +533,10 @@ end module:hook("s2sout-destroyed", handle_s2s_destroyed); module:hook("s2sin-destroyed", handle_s2s_destroyed); -function handle_resume(session, stanza, xmlns_sm) +function do_resume(session, stanza) if session.full_jid then session.log("warn", "Tried to resume after resource binding"); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("unexpected-request", { xmlns = xmlns_errors }) - ); - return true; + return nil, resume_errors.new("already_bound"); end local id = stanza.attr.previd; @@ -542,78 +545,94 @@ function handle_resume(session, stanza, xmlns_sm) local old_session = old_session_registry:get(session.username, id); if old_session then session.log("debug", "Tried to resume old expired session with id %s", id); - session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); clear_old_session(session, id); resumption_expired(1); - else - session.log("debug", "Tried to resume non-existent session with id %s", id); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); - end; - else - if original_session.hibernating_watchdog then - original_session.log("debug", "Letting the watchdog go"); - original_session.hibernating_watchdog:cancel(); - original_session.hibernating_watchdog = nil; - elseif session.hibernating then - original_session.log("error", "Hibernating session has no watchdog!") - end - -- zero age = was not hibernating yet - local age = 0; - if original_session.hibernating then - local now = os_time(); - age = now - original_session.hibernating; + return nil, resume_errors.new("expired", { h = old_session.h }); end + session.log("debug", "Tried to resume non-existent session with id %s", id); + return nil, resume_errors.new("unknown_session"); + end - session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); - - local queue = original_session.outgoing_stanza_queue; - local h = tonumber(stanza.attr.h); + if original_session.hibernating_watchdog then + original_session.log("debug", "Letting the watchdog go"); + original_session.hibernating_watchdog:cancel(); + original_session.hibernating_watchdog = nil; + elseif session.hibernating then + original_session.log("error", "Hibernating session has no watchdog!") + end + -- zero age = was not hibernating yet + local age = 0; + if original_session.hibernating then + local now = os_time(); + age = now - original_session.hibernating; + end - original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) - local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked + session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); - if not err and not queue:resumable() then - err = ack_errors.new("overflow"); - end + local queue = original_session.outgoing_stanza_queue; + local h = tonumber(stanza.attr.h); - if err then - session.send(st.stanza("failed", - { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id })); - session.log("debug", "Resumption failed: %s", err); - return true; - end + original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) + local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked - -- Update original_session with the parameters (connection, etc.) from the new session - sessionmanager.update_session(original_session, session); + if not err and not queue:resumable() then + err = ack_errors.new("overflow"); + end - -- Inform client of successful resumption - original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, - h = format_h(original_session.handled_stanza_count), previd = id })); + if err then + session.log("debug", "Resumption failed: %s", err); + return nil, err; + end - -- Ok, we need to re-send any stanzas that the client didn't see - -- ...they are what is now left in the outgoing stanza queue - -- We have to use the send of "session" because we don't want to add our resent stanzas - -- to the outgoing queue again + -- Update original_session with the parameters (connection, etc.) from the new session + sessionmanager.update_session(original_session, session); + + return { + session = original_session; + id = id; + -- Return function to complete the resumption and resync unacked stanzas + -- This is two steps so we can support SASL2/ISR + finish = function () + -- Ok, we need to re-send any stanzas that the client didn't see + -- ...they are what is now left in the outgoing stanza queue + -- We have to use the send of "session" because we don't want to add our resent stanzas + -- to the outgoing queue again + + original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); + for _, queued_stanza in queue:resume() do + original_session.send(queued_stanza); + end + original_session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); - original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); - for _, queued_stanza in queue:resume() do - original_session.send(queued_stanza); - end - session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); + -- Add our own handlers to the resumed session (filters have been reset in the update) + wrap_session(original_session, true); - -- Add our own handlers to the resumed session (filters have been reset in the update) - wrap_session(original_session, true); + -- Let everyone know that we are no longer hibernating + module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); + original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption + request_ack_now_if_needed(original_session, true, "handle_resume", nil); + resumption_age:sample(age); + end; + }; +end - -- Let everyone know that we are no longer hibernating - module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); - original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption - request_ack_now_if_needed(original_session, true, "handle_resume", nil); - resumption_age:sample(age); +function handle_resume(session, stanza, xmlns_sm) + local resumed, err = do_resume(session, stanza); + if not resumed then + session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(err.context.h) }) + :tag(err.condition, { xmlns = xmlns_errors })); + return true; end + + session = resumed.session; + + -- Inform client of successful resumption + session.send(st.stanza("resumed", { xmlns = xmlns_sm, + h = format_h(session.handled_stanza_count), previd = resumed.id })); + + -- Complete resume (sync stanzas, etc.) + resumed.finish(); + return true; end -- cgit v1.2.3