aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/mod_smacks.lua118
1 files changed, 62 insertions, 56 deletions
diff --git a/plugins/mod_smacks.lua b/plugins/mod_smacks.lua
index 2cb4e020..54da11b6 100644
--- a/plugins/mod_smacks.lua
+++ b/plugins/mod_smacks.lua
@@ -145,45 +145,63 @@ module:hook("s2s-stream-features",
end
end);
-local function request_ack_if_needed(session, force, reason, stanza)
+local function should_ack(session, force)
+ if not session then return end -- shouldn't be possible
+ if session.destroyed then return end -- gone
+ if not session.smacks then return end -- not using
+ if session.hibernating then return end -- can't ack when asleep
+ if session.awaiting_ack then return end -- already waiting
+ if force then return force end
local queue = session.outgoing_stanza_queue;
local expected_h = session.last_acknowledged_stanza + #queue;
local max_unacked = max_unacked_stanzas;
if session.state == "inactive" then
max_unacked = max_inactive_unacked_stanzas;
end
- if session.awaiting_ack == nil and not session.hibernating then
- -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
- -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
- -- further requests until a higher h-value would be expected.
- if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
- session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
- session.awaiting_ack = false;
- session.awaiting_ack_timer = timer.add_task(1e-06, function ()
- -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
- -- only request ack if needed and our session is not already hibernated or destroyed
- if not session.awaiting_ack and not session.hibernating and not session.destroyed then
- session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
- (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
- if session.destroyed then return end -- sending something can trigger destruction
- session.awaiting_ack = true;
- -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
- session.last_requested_h = session.last_acknowledged_stanza + #queue;
- session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
- if not session.delayed_ack_timer then
- session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
- ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
- end);
- end
- end
- end);
- end
+ -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
+ -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
+ -- further requests until a higher h-value would be expected.
+ return #queue > max_unacked and expected_h ~= session.last_requested_h;
+end
+
+local function request_ack(session, reason)
+ local queue = session.outgoing_stanza_queue;
+ session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
+ (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
+ if session.destroyed then return end -- sending something can trigger destruction
+ session.awaiting_ack = true;
+ -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
+ session.last_requested_h = session.last_acknowledged_stanza + #queue;
+ session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
+ if not session.delayed_ack_timer then
+ session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
+ ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
+ end);
+ end
+end
+
+local function request_ack_now_if_needed(session, force, reason)
+ if should_ack(session, force) then
+ request_ack(session, reason);
+ end
+end
+
+local function request_ack_if_needed(session, force, reason, stanza)
+ if should_ack(session, force) then
+ timer.add_task(0, function ()
+ request_ack_now_if_needed(session, force, reason, stanza);
+ end);
end
-- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
-- and there isn't already a timer for this event running.
-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
-- would not trigger this event (again).
+ local queue = session.outgoing_stanza_queue;
+ local max_unacked = max_unacked_stanzas;
+ if session.state == "inactive" then
+ max_unacked = max_inactive_unacked_stanzas;
+ end
if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
@@ -338,10 +356,7 @@ function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
-- Reply with <a>
(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
- local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
- if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
- request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
- end
+ request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil);
return true;
end
module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
@@ -385,7 +400,7 @@ function handle_a(origin, stanza)
origin.log("debug", "#queue = %d", #queue);
origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
- request_ack_if_needed(origin, false, "handle_a", nil)
+ request_ack_now_if_needed(origin, false, "handle_a", nil)
return true;
end
module:hook_tag(xmlns_sm2, "a", handle_a);
@@ -632,22 +647,20 @@ end
module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
-module:hook("csi-client-active", function (event)
- if event.origin.smacks then
- request_ack_if_needed(event.origin, true, "csi-active", nil);
- end
-end);
-
-module:hook("csi-flushing", function(event)
- local session = event.session;
- if session.smacks then
- if not session.awaiting_ack and not session.hibernating and not session.destroyed then
- session.log("debug", "Sending <r> (csi-flushing)");
- session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first
- (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
- end
- end
-end);
+-- Events when it's sensible to request an ack
+-- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?
+local request_ack_events = {
+ ["csi-client-active"] = true;
+ ["csi-flushing"] = false;
+};
+
+for event_name, force in pairs(request_ack_events) do
+ module:log("info", "module:hook(%q, function)");
+ module:hook(event_name, function(event)
+ local session = event.session or event.origin;
+ request_ack_now_if_needed(session, force, event_name);
+ end);
+end
local function handle_read_timeout(event)
local session = event.session;
@@ -663,14 +676,7 @@ local function handle_read_timeout(event)
end
return false; -- Kick the session
end
- session.log("debug", "Sending <r> (read timeout)");
- (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
- session.awaiting_ack = true;
- if not session.delayed_ack_timer then
- session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
- ack_delayed(session, nil);
- end);
- end
+ request_ack_now_if_needed(session, true, "read timeout");
return true;
end
end