diff options
author | Kim Alvefur <zash@zash.se> | 2021-11-24 21:27:49 +0100 |
---|---|---|
committer | Kim Alvefur <zash@zash.se> | 2021-11-24 21:27:49 +0100 |
commit | faff262ac8756de29154132b0e32097bd9f38e5b (patch) | |
tree | f3186a6d81d335b4c265447cfc9454b8864de87e /plugins | |
parent | d4398ef21d325aa8679f7d6b299ae6314d90da09 (diff) | |
download | prosody-faff262ac8756de29154132b0e32097bd9f38e5b.tar.gz prosody-faff262ac8756de29154132b0e32097bd9f38e5b.zip |
mod_smacks: Refactor ack requesting to avoid some timer roundtrips
The function was too large to comprehend! Breaking it up helps
readability and reuse.
The timer round rip is only to avoid ordering weirdness when sending
from inside a stanza filter. No need when handling <r> and <a>
CSI interactions both boiled down to sending an <r> immediately.
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/mod_smacks.lua | 118 |
1 files changed, 62 insertions, 56 deletions
diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua index 2cb4e020..54da11b6 100644 --- a/plugins/mod_smacks.lua +++ b/plugins/mod_smacks.lua @@ -145,45 +145,63 @@ module:hook("s2s-stream-features", end end); -local function request_ack_if_needed(session, force, reason, stanza) +local function should_ack(session, force) + if not session then return end -- shouldn't be possible + if session.destroyed then return end -- gone + if not session.smacks then return end -- not using + if session.hibernating then return end -- can't ack when asleep + if session.awaiting_ack then return end -- already waiting + if force then return force end local queue = session.outgoing_stanza_queue; local expected_h = session.last_acknowledged_stanza + #queue; local max_unacked = max_unacked_stanzas; if session.state == "inactive" then max_unacked = max_inactive_unacked_stanzas; end - if session.awaiting_ack == nil and not session.hibernating then - -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong - -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any - -- further requests until a higher h-value would be expected. - if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then - session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); - session.awaiting_ack = false; - session.awaiting_ack_timer = timer.add_task(1e-06, function () - -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); - -- only request ack if needed and our session is not already hibernated or destroyed - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - if session.destroyed then return end -- sending something can trigger destruction - session.awaiting_ack = true; - -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) - session.last_requested_h = session.last_acknowledged_stanza + #queue; - session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); - if not session.delayed_ack_timer then - session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() - ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue - end); - end - end - end); - end + -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong + -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any + -- further requests until a higher h-value would be expected. + return #queue > max_unacked and expected_h ~= session.last_requested_h; +end + +local function request_ack(session, reason) + local queue = session.outgoing_stanza_queue; + session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); + (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) + if session.destroyed then return end -- sending something can trigger destruction + session.awaiting_ack = true; + -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) + session.last_requested_h = session.last_acknowledged_stanza + #queue; + session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); + if not session.delayed_ack_timer then + session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() + ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue + end); + end +end + +local function request_ack_now_if_needed(session, force, reason) + if should_ack(session, force) then + request_ack(session, reason); + end +end + +local function request_ack_if_needed(session, force, reason, stanza) + if should_ack(session, force) then + timer.add_task(0, function () + request_ack_now_if_needed(session, force, reason, stanza); + end); end -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue -- and there isn't already a timer for this event running. -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event -- would not trigger this event (again). + local queue = session.outgoing_stanza_queue; + local max_unacked = max_unacked_stanzas; + if session.state == "inactive" then + max_unacked = max_inactive_unacked_stanzas; + end if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules @@ -338,10 +356,7 @@ function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza -- Reply with <a> (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) - local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; - if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then - request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); - end + request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); return true; end module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); @@ -385,7 +400,7 @@ function handle_a(origin, stanza) origin.log("debug", "#queue = %d", #queue); origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; - request_ack_if_needed(origin, false, "handle_a", nil) + request_ack_now_if_needed(origin, false, "handle_a", nil) return true; end module:hook_tag(xmlns_sm2, "a", handle_a); @@ -632,22 +647,20 @@ end module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); -module:hook("csi-client-active", function (event) - if event.origin.smacks then - request_ack_if_needed(event.origin, true, "csi-active", nil); - end -end); - -module:hook("csi-flushing", function(event) - local session = event.session; - if session.smacks then - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (csi-flushing)"); - session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - end - end -end); +-- Events when it's sensible to request an ack +-- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? +local request_ack_events = { + ["csi-client-active"] = true; + ["csi-flushing"] = false; +}; + +for event_name, force in pairs(request_ack_events) do + module:log("info", "module:hook(%q, function)"); + module:hook(event_name, function(event) + local session = event.session or event.origin; + request_ack_now_if_needed(session, force, event_name); + end); +end local function handle_read_timeout(event) local session = event.session; @@ -663,14 +676,7 @@ local function handle_read_timeout(event) end return false; -- Kick the session end - session.log("debug", "Sending <r> (read timeout)"); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); - session.awaiting_ack = true; - if not session.delayed_ack_timer then - session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() - ack_delayed(session, nil); - end); - end + request_ack_now_if_needed(session, true, "read timeout"); return true; end end |