From 54fcc029c8fafd6e9fd9a38e279e5557177c1370 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Fri, 26 Aug 2022 17:04:15 +0100 Subject: mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs --- core/sessionmanager.lua | 46 ++++++++++++++++++++++++++++++++++- plugins/mod_c2s.lua | 8 +++++++ plugins/mod_smacks.lua | 64 ++++++++++--------------------------------------- 3 files changed, 66 insertions(+), 52 deletions(-) diff --git a/core/sessionmanager.lua b/core/sessionmanager.lua index dec21674..cdfd040f 100644 --- a/core/sessionmanager.lua +++ b/core/sessionmanager.lua @@ -10,7 +10,7 @@ local tostring, setmetatable = tostring, setmetatable; local pairs, next= pairs, next; -local hosts = prosody.hosts; +local prosody, hosts = prosody, prosody.hosts; local full_sessions = prosody.full_sessions; local bare_sessions = prosody.bare_sessions; @@ -92,6 +92,49 @@ local function retire_session(session) return setmetatable(session, resting_session); end +-- Update a session with a new one (transplanting connection, filters, etc.) +-- new_session should be discarded after this call returns +local function update_session(to_session, from_session) + to_session.log("debug", "Updating with parameters from session %s", from_session.id); + from_session.log("debug", "Session absorbed into %s", to_session.id); + + local replaced_conn = to_session.conn; + if replaced_conn then + to_session.log("debug", "closing a replaced connection for this session"); + replaced_conn:close(); + end + + to_session.ip = from_session.ip; + to_session.conn = from_session.conn; + to_session.rawsend = from_session.rawsend; + to_session.rawsend.session = to_session; + to_session.rawsend.conn = to_session.conn; + to_session.send = from_session.send; + to_session.send.session = to_session; + to_session.close = from_session.close; + to_session.filter = from_session.filter; + to_session.filter.session = to_session; + to_session.filters = from_session.filters; + to_session.send.filter = to_session.filter; + to_session.stream = from_session.stream; + to_session.secure = from_session.secure; + to_session.hibernating = nil; + to_session.resumption_counter = (to_session.resumption_counter or 0) + 1; + from_session.log = to_session.log; + from_session.type = to_session.type; + -- Inform xmppstream of the new session (passed to its callbacks) + to_session.stream:set_session(to_session); + + -- Retire the session we've pulled from, to avoid two sessions on the same connection + retire_session(from_session); + + prosody.events.fire_event("c2s-session-updated", { + session = to_session; + from_session = from_session; + replaced_conn = replaced_conn; + }); +end + local function destroy_session(session, err) (session.log or log)("debug", "Destroying session for %s (%s@%s)%s", session.full_jid or "(unknown)", session.username or "(unknown)", @@ -267,6 +310,7 @@ end return { new_session = new_session; retire_session = retire_session; + update_session = update_session; destroy_session = destroy_session; make_authenticated = make_authenticated; bind_resource = bind_resource; diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index e8241687..2d4186d0 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -262,6 +262,14 @@ module:hook_global("user-password-changed", disconnect_user_sessions({ condition module:hook_global("user-role-changed", disconnect_user_sessions({ condition = "reset", text = "Role changed" }), 200); module:hook_global("user-deleted", disconnect_user_sessions({ condition = "not-authorized", text = "Account deleted" }), 200); +module:hook_global("c2s-session-updated", function (event) + sessions[event.session.conn] = event.session; + local replaced_conn = event.replaced_conn; + if replaced_conn then + sessions[replaced_conn] = nil; + end +end); + function runner_callbacks:ready() if self.data.conn then self.data.conn:resume(); diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua index e2bbff9c..a1435829 100644 --- a/plugins/mod_smacks.lua +++ b/plugins/mod_smacks.lua @@ -196,7 +196,6 @@ local function outgoing_stanza_filter(stanza, session) -- supposed to be nil. -- However, when using mod_smacks with mod_websocket, then mod_websocket's -- stanzas/out filter can get called before this one and adds the xmlns. - if session.resending_unacked then return stanza end if not session.smacks then return stanza end local is_stanza = st.is_stanza(stanza) and (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') @@ -496,7 +495,6 @@ module:hook("pre-resource-unbind", function (event) session.log("debug", "Destroying session for hibernating too long"); save_old_session(session); session.resumption_token = nil; - session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore sessionmanager.destroy_session(session, "Hibernating too long"); sessions_expired(1); end); @@ -529,10 +527,6 @@ end module:hook("s2sout-destroyed", handle_s2s_destroyed); module:hook("s2sin-destroyed", handle_s2s_destroyed); -local function get_session_id(session) - return session.id or (tostring(session):match("[a-f0-9]+$")); -end - function handle_resume(session, stanza, xmlns_sm) if session.full_jid then session.log("warn", "Tried to resume after resource binding"); @@ -573,40 +567,11 @@ function handle_resume(session, stanza, xmlns_sm) local now = os_time(); age = now - original_session.hibernating; end - session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); - original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); - -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) - if original_session.conn then - original_session.log("debug", "mod_smacks closing an old connection for this session"); - local conn = original_session.conn; - c2s_sessions[conn] = nil; - conn:close(); - end - local migrated_session_log = session.log; - original_session.ip = session.ip; - original_session.conn = session.conn; - original_session.rawsend = session.rawsend; - original_session.rawsend.session = original_session; - original_session.rawsend.conn = original_session.conn; - original_session.send = session.send; - original_session.send.session = original_session; - original_session.close = session.close; - original_session.filter = session.filter; - original_session.filter.session = original_session; - original_session.filters = session.filters; - original_session.send.filter = original_session.filter; - original_session.stream = session.stream; - original_session.secure = session.secure; - original_session.hibernating = nil; - original_session.resumption_counter = (original_session.resumption_counter or 0) + 1; - session.log = original_session.log; - session.type = original_session.type; - wrap_session(original_session, true); - -- Inform xmppstream of the new session (passed to its callbacks) - original_session.stream:set_session(original_session); - -- Similar for connlisteners - c2s_sessions[session.conn] = original_session; + session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); + + -- Update original_session with the parameters (connection, etc.) from the new session + sessionmanager.update_session(original_session, session); local queue = original_session.outgoing_stanza_queue; local h = tonumber(stanza.attr.h); @@ -633,20 +598,16 @@ function handle_resume(session, stanza, xmlns_sm) -- We have to use the send of "session" because we don't want to add our resent stanzas -- to the outgoing queue again - session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); - -- FIXME Which session is it that the queue filter sees? - session.resending_unacked = true; - original_session.resending_unacked = true; + original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); for _, queued_stanza in queue:resume() do - session.send(queued_stanza); - end - session.resending_unacked = nil; - original_session.resending_unacked = nil; - session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); - function session.send(stanza) -- luacheck: ignore 432 - migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); - return false; + original_session.send(queued_stanza); end + session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); + + -- Add our own handlers to the resumed session (filters have been reset in the update) + wrap_session(original_session, true); + + -- Let everyone know that we are no longer hibernating module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption request_ack_now_if_needed(original_session, true, "handle_resume", nil); @@ -654,6 +615,7 @@ function handle_resume(session, stanza, xmlns_sm) end return true; end + module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); -- cgit v1.2.3