aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2022-08-26 17:04:15 +0100
committerMatthew Wild <mwild1@gmail.com>2022-08-26 17:04:15 +0100
commit54fcc029c8fafd6e9fd9a38e279e5557177c1370 (patch)
treed80b23dbee76a881b01f051deb9690e5672bb602
parent95bba786f1fb0e15b98fa2b475d0ad585ad359a3 (diff)
downloadprosody-54fcc029c8fafd6e9fd9a38e279e5557177c1370.tar.gz
prosody-54fcc029c8fafd6e9fd9a38e279e5557177c1370.zip
mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs
-rw-r--r--core/sessionmanager.lua46
-rw-r--r--plugins/mod_c2s.lua8
-rw-r--r--plugins/mod_smacks.lua64
3 files changed, 66 insertions, 52 deletions
diff --git a/core/sessionmanager.lua b/core/sessionmanager.lua
index dec21674..cdfd040f 100644
--- a/core/sessionmanager.lua
+++ b/core/sessionmanager.lua
@@ -10,7 +10,7 @@
local tostring, setmetatable = tostring, setmetatable;
local pairs, next= pairs, next;
-local hosts = prosody.hosts;
+local prosody, hosts = prosody, prosody.hosts;
local full_sessions = prosody.full_sessions;
local bare_sessions = prosody.bare_sessions;
@@ -92,6 +92,49 @@ local function retire_session(session)
return setmetatable(session, resting_session);
end
+-- Update a session with a new one (transplanting connection, filters, etc.)
+-- new_session should be discarded after this call returns
+local function update_session(to_session, from_session)
+ to_session.log("debug", "Updating with parameters from session %s", from_session.id);
+ from_session.log("debug", "Session absorbed into %s", to_session.id);
+
+ local replaced_conn = to_session.conn;
+ if replaced_conn then
+ to_session.log("debug", "closing a replaced connection for this session");
+ replaced_conn:close();
+ end
+
+ to_session.ip = from_session.ip;
+ to_session.conn = from_session.conn;
+ to_session.rawsend = from_session.rawsend;
+ to_session.rawsend.session = to_session;
+ to_session.rawsend.conn = to_session.conn;
+ to_session.send = from_session.send;
+ to_session.send.session = to_session;
+ to_session.close = from_session.close;
+ to_session.filter = from_session.filter;
+ to_session.filter.session = to_session;
+ to_session.filters = from_session.filters;
+ to_session.send.filter = to_session.filter;
+ to_session.stream = from_session.stream;
+ to_session.secure = from_session.secure;
+ to_session.hibernating = nil;
+ to_session.resumption_counter = (to_session.resumption_counter or 0) + 1;
+ from_session.log = to_session.log;
+ from_session.type = to_session.type;
+ -- Inform xmppstream of the new session (passed to its callbacks)
+ to_session.stream:set_session(to_session);
+
+ -- Retire the session we've pulled from, to avoid two sessions on the same connection
+ retire_session(from_session);
+
+ prosody.events.fire_event("c2s-session-updated", {
+ session = to_session;
+ from_session = from_session;
+ replaced_conn = replaced_conn;
+ });
+end
+
local function destroy_session(session, err)
(session.log or log)("debug", "Destroying session for %s (%s@%s)%s",
session.full_jid or "(unknown)", session.username or "(unknown)",
@@ -267,6 +310,7 @@ end
return {
new_session = new_session;
retire_session = retire_session;
+ update_session = update_session;
destroy_session = destroy_session;
make_authenticated = make_authenticated;
bind_resource = bind_resource;
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua
index e8241687..2d4186d0 100644
--- a/plugins/mod_c2s.lua
+++ b/plugins/mod_c2s.lua
@@ -262,6 +262,14 @@ module:hook_global("user-password-changed", disconnect_user_sessions({ condition
module:hook_global("user-role-changed", disconnect_user_sessions({ condition = "reset", text = "Role changed" }), 200);
module:hook_global("user-deleted", disconnect_user_sessions({ condition = "not-authorized", text = "Account deleted" }), 200);
+module:hook_global("c2s-session-updated", function (event)
+ sessions[event.session.conn] = event.session;
+ local replaced_conn = event.replaced_conn;
+ if replaced_conn then
+ sessions[replaced_conn] = nil;
+ end
+end);
+
function runner_callbacks:ready()
if self.data.conn then
self.data.conn:resume();
diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua
index e2bbff9c..a1435829 100644
--- a/plugins/mod_smacks.lua
+++ b/plugins/mod_smacks.lua
@@ -196,7 +196,6 @@ local function outgoing_stanza_filter(stanza, session)
-- supposed to be nil.
-- However, when using mod_smacks with mod_websocket, then mod_websocket's
-- stanzas/out filter can get called before this one and adds the xmlns.
- if session.resending_unacked then return stanza end
if not session.smacks then return stanza end
local is_stanza = st.is_stanza(stanza) and
(not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client')
@@ -496,7 +495,6 @@ module:hook("pre-resource-unbind", function (event)
session.log("debug", "Destroying session for hibernating too long");
save_old_session(session);
session.resumption_token = nil;
- session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
sessionmanager.destroy_session(session, "Hibernating too long");
sessions_expired(1);
end);
@@ -529,10 +527,6 @@ end
module:hook("s2sout-destroyed", handle_s2s_destroyed);
module:hook("s2sin-destroyed", handle_s2s_destroyed);
-local function get_session_id(session)
- return session.id or (tostring(session):match("[a-f0-9]+$"));
-end
-
function handle_resume(session, stanza, xmlns_sm)
if session.full_jid then
session.log("warn", "Tried to resume after resource binding");
@@ -573,40 +567,11 @@ function handle_resume(session, stanza, xmlns_sm)
local now = os_time();
age = now - original_session.hibernating;
end
- session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
- original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
- -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
- if original_session.conn then
- original_session.log("debug", "mod_smacks closing an old connection for this session");
- local conn = original_session.conn;
- c2s_sessions[conn] = nil;
- conn:close();
- end
- local migrated_session_log = session.log;
- original_session.ip = session.ip;
- original_session.conn = session.conn;
- original_session.rawsend = session.rawsend;
- original_session.rawsend.session = original_session;
- original_session.rawsend.conn = original_session.conn;
- original_session.send = session.send;
- original_session.send.session = original_session;
- original_session.close = session.close;
- original_session.filter = session.filter;
- original_session.filter.session = original_session;
- original_session.filters = session.filters;
- original_session.send.filter = original_session.filter;
- original_session.stream = session.stream;
- original_session.secure = session.secure;
- original_session.hibernating = nil;
- original_session.resumption_counter = (original_session.resumption_counter or 0) + 1;
- session.log = original_session.log;
- session.type = original_session.type;
- wrap_session(original_session, true);
- -- Inform xmppstream of the new session (passed to its callbacks)
- original_session.stream:set_session(original_session);
- -- Similar for connlisteners
- c2s_sessions[session.conn] = original_session;
+ session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
+
+ -- Update original_session with the parameters (connection, etc.) from the new session
+ sessionmanager.update_session(original_session, session);
local queue = original_session.outgoing_stanza_queue;
local h = tonumber(stanza.attr.h);
@@ -633,20 +598,16 @@ function handle_resume(session, stanza, xmlns_sm)
-- We have to use the send of "session" because we don't want to add our resent stanzas
-- to the outgoing queue again
- session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
- -- FIXME Which session is it that the queue filter sees?
- session.resending_unacked = true;
- original_session.resending_unacked = true;
+ original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
for _, queued_stanza in queue:resume() do
- session.send(queued_stanza);
- end
- session.resending_unacked = nil;
- original_session.resending_unacked = nil;
- session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked());
- function session.send(stanza) -- luacheck: ignore 432
- migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
- return false;
+ original_session.send(queued_stanza);
end
+ session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
+
+ -- Add our own handlers to the resumed session (filters have been reset in the update)
+ wrap_session(original_session, true);
+
+ -- Let everyone know that we are no longer hibernating
module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
request_ack_now_if_needed(original_session, true, "handle_resume", nil);
@@ -654,6 +615,7 @@ function handle_resume(session, stanza, xmlns_sm)
end
return true;
end
+
module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);