diff options
-rw-r--r-- | plugins/mod_smacks.lua | 118 |
1 files changed, 62 insertions, 56 deletions
diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua index 2cb4e020..54da11b6 100644 --- a/plugins/mod_smacks.lua +++ b/plugins/mod_smacks.lua @@ -145,45 +145,63 @@ module:hook("s2s-stream-features", end end); -local function request_ack_if_needed(session, force, reason, stanza) +local function should_ack(session, force) + if not session then return end -- shouldn't be possible + if session.destroyed then return end -- gone + if not session.smacks then return end -- not using + if session.hibernating then return end -- can't ack when asleep + if session.awaiting_ack then return end -- already waiting + if force then return force end local queue = session.outgoing_stanza_queue; local expected_h = session.last_acknowledged_stanza + #queue; local max_unacked = max_unacked_stanzas; if session.state == "inactive" then max_unacked = max_inactive_unacked_stanzas; end - if session.awaiting_ack == nil and not session.hibernating then - -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong - -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any - -- further requests until a higher h-value would be expected. - if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then - session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); - session.awaiting_ack = false; - session.awaiting_ack_timer = timer.add_task(1e-06, function () - -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); - -- only request ack if needed and our session is not already hibernated or destroyed - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - if session.destroyed then return end -- sending something can trigger destruction - session.awaiting_ack = true; - -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) - session.last_requested_h = session.last_acknowledged_stanza + #queue; - session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); - if not session.delayed_ack_timer then - session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() - ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue - end); - end - end - end); - end + -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong + -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any + -- further requests until a higher h-value would be expected. + return #queue > max_unacked and expected_h ~= session.last_requested_h; +end + +local function request_ack(session, reason) + local queue = session.outgoing_stanza_queue; + session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); + (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) + if session.destroyed then return end -- sending something can trigger destruction + session.awaiting_ack = true; + -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) + session.last_requested_h = session.last_acknowledged_stanza + #queue; + session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); + if not session.delayed_ack_timer then + session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() + ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue + end); + end +end + +local function request_ack_now_if_needed(session, force, reason) + if should_ack(session, force) then + request_ack(session, reason); + end +end + +local function request_ack_if_needed(session, force, reason, stanza) + if should_ack(session, force) then + timer.add_task(0, function () + request_ack_now_if_needed(session, force, reason, stanza); + end); end -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue -- and there isn't already a timer for this event running. -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event -- would not trigger this event (again). + local queue = session.outgoing_stanza_queue; + local max_unacked = max_unacked_stanzas; + if session.state == "inactive" then + max_unacked = max_inactive_unacked_stanzas; + end if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules @@ -338,10 +356,7 @@ function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza -- Reply with <a> (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) - local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; - if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then - request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); - end + request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); return true; end module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); @@ -385,7 +400,7 @@ function handle_a(origin, stanza) origin.log("debug", "#queue = %d", #queue); origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; - request_ack_if_needed(origin, false, "handle_a", nil) + request_ack_now_if_needed(origin, false, "handle_a", nil) return true; end module:hook_tag(xmlns_sm2, "a", handle_a); @@ -632,22 +647,20 @@ end module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); -module:hook("csi-client-active", function (event) - if event.origin.smacks then - request_ack_if_needed(event.origin, true, "csi-active", nil); - end -end); - -module:hook("csi-flushing", function(event) - local session = event.session; - if session.smacks then - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (csi-flushing)"); - session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - end - end -end); +-- Events when it's sensible to request an ack +-- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? +local request_ack_events = { + ["csi-client-active"] = true; + ["csi-flushing"] = false; +}; + +for event_name, force in pairs(request_ack_events) do + module:log("info", "module:hook(%q, function)"); + module:hook(event_name, function(event) + local session = event.session or event.origin; + request_ack_now_if_needed(session, force, event_name); + end); +end local function handle_read_timeout(event) local session = event.session; @@ -663,14 +676,7 @@ local function handle_read_timeout(event) end return false; -- Kick the session end - session.log("debug", "Sending <r> (read timeout)"); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); - session.awaiting_ack = true; - if not session.delayed_ack_timer then - session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() - ack_delayed(session, nil); - end); - end + request_ack_now_if_needed(session, true, "read timeout"); return true; end end |