From bdce99b7dd4411a8717798874a320bd2fc191b57 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Thu, 2 Dec 2021 02:46:26 +0100 Subject: mod_smacks: Set a watchdog to watch sleeping sessions Extending the timeout by poking the watchdog, and letting it go on resumption, should be much better than the previous method. --- plugins/mod_smacks.lua | 86 ++++++++++++++++---------------------------------- 1 file changed, 27 insertions(+), 59 deletions(-) diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua index 4ca2b52d..57fd6f6d 100644 --- a/plugins/mod_smacks.lua +++ b/plugins/mod_smacks.lua @@ -13,7 +13,6 @@ local tonumber = tonumber; local tostring = tostring; -local math_max = math.max; local math_min = math.min; local os_time = os.time; local t_remove = table.remove; @@ -24,6 +23,7 @@ local jid = require "util.jid"; local st = require "util.stanza"; local timer = require "util.timer"; local uuid_generate = require "util.uuid".generate; +local watchdog = require "util.watchdog"; local sessionmanager = require "core.sessionmanager"; local core_process_stanza = prosody.core_process_stanza; @@ -413,64 +413,35 @@ end); module:hook("pre-resource-unbind", function (event) local session = event.session; - if session.smacks then + if not session.smacks then return end if not session.resumption_token then local queue = session.outgoing_stanza_queue; if #queue > 0 then session.log("debug", "Destroying session with %d unacked stanzas", #queue); handle_unacked_stanzas(session); end - else - session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout); - local hibernate_time = os_time(); -- Track the time we went into hibernation - session.hibernating = hibernate_time; - local resumption_token = session.resumption_token; - module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue}); - timer.add_task(resume_timeout, function () - session.log("debug", "mod_smacks hibernation timeout reached..."); - -- We need to check the current resumption token for this resource - -- matches the smacks session this timer is for in case it changed - -- (for example, the client may have bound a new resource and - -- started a new smacks session, or not be using smacks) - local curr_session = prosody.full_sessions[session.full_jid]; - if session.destroyed then - session.log("debug", "The session has already been destroyed"); - elseif curr_session and curr_session.resumption_token == resumption_token - -- Check the hibernate time still matches what we think it is, - -- otherwise the session resumed and re-hibernated. - and session.hibernating == hibernate_time then - -- wait longer if the timeout isn't reached because push was enabled for this session - -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients - -- wait for an additional resume_timeout seconds if no push occurred since hibernation at all - local current_time = os_time(); - local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating); - if session.push_identifier ~= nil and not session.first_hibernated_push then - session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout); - return resume_timeout; - end - if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then - session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", - resume_timeout - (current_time - timeout_start)); - return resume_timeout-(current_time-timeout_start); -- time left to wait - end - session.log("debug", "Destroying session for hibernating too long"); - session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; - -- save only actual h value and username/host (for security) - old_session_registry:set(session.username, session.resumption_token, { - h = session.handled_stanza_count, - }); - session.resumption_token = nil; - sessionmanager.destroy_session(session, "Hibernating too long"); - else - session.log("debug", "Session resumed before hibernation timeout, all is well") - end - end); - if session.conn then - session.conn:close(); - end - return true; -- Postpone destruction for now + return + end + + session.hibernating = os_time(); + session.hibernating_watchdog = watchdog.new(resume_timeout, function() + session.log("debug", "mod_smacks hibernation timeout reached..."); + if session.destroyed then + session.log("debug", "The session has already been destroyed"); + return end + + session.log("debug", "Destroying session for hibernating too long"); + session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; + old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count }); + session.resumption_token = nil; + sessionmanager.destroy_session(session, "Hibernating too long"); + end); + if session.conn then + session.conn:close(); end + module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue }); + return true; -- Postpone destruction for now end); local function handle_s2s_destroyed(event) @@ -520,8 +491,11 @@ function handle_resume(session, stanza, xmlns_sm) :tag("item-not-found", { xmlns = xmlns_errors }) ); end; - elseif session.username == original_session.username - and session.host == original_session.host then + else + if original_session.hibernating_watchdog then + original_session.hibernating_watchdog:cancel(); + original_session.hibernating_watchdog = nil; + end session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) @@ -584,12 +558,6 @@ function handle_resume(session, stanza, xmlns_sm) end module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); request_ack_if_needed(original_session, true, "handle_resume", nil); - else - module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", - session.username or "?", session.host or "?", session.type, - original_session.username or "?", original_session.host or "?", original_session.type); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("not-authorized", { xmlns = xmlns_errors })); end return true; end -- cgit v1.2.3