From 7ef5adb448ad36074123a924dc80607e087ed200 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sat, 24 Nov 2018 02:25:44 +0100 Subject: mod_csi_simple: Use write locks in net.server if available --- plugins/mod_csi_simple.lua | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index da2dd953..abe65fce 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -86,7 +86,9 @@ end, -1); module:hook("csi-client-inactive", function (event) local session = event.origin; - if session.pump then + if session.conn and session.conn and session.conn.pause_writes then + session.conn:pause_writes(); + elseif session.pump then session.pump:pause(); else local bare_jid = jid.join(session.username, session.host); @@ -115,6 +117,8 @@ module:hook("csi-client-active", function (event) local session = event.origin; if session.pump then session.pump:resume(); + elseif session.conn and session.conn and session.conn.resume_writes then + session.conn:resume_writes(); end end); -- cgit v1.2.3 From e31a4cdd89be95fd6b97617491ea8e4a9822af66 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 18:32:50 +0100 Subject: mod_csi_simple: Break out stanza timestamping into a function for future reuse --- plugins/mod_csi_simple.lua | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index abe65fce..6bd6c0bf 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -84,6 +84,14 @@ module:hook("csi-is-stanza-important", function (event) return true; end, -1); +local function with_timestamp(stanza, from) + if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then + stanza = st.clone(stanza); + stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); + end + return stanza; +end + module:hook("csi-client-inactive", function (event) local session = event.origin; if session.conn and session.conn and session.conn.pause_writes then @@ -102,11 +110,7 @@ module:hook("csi-client-inactive", function (event) pump:flush(); send(stanza); else - if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then - stanza = st.clone(stanza); - stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()})); - end - pump:push(stanza); + pump:push(with_timestamp(stanza, bare_jid)); end return true; end -- cgit v1.2.3 From 00f3e4db9c96b7939c1cf87c514a40f58ae7fde0 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 18:33:38 +0100 Subject: mod_csi_simple: Count buffered items and flush when it reaches configured limit In this mode, stanzas have been serialized to strings in the internal net.server buffer, so it is difficult to count them after the fact. --- plugins/mod_csi_simple.lua | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 6bd6c0bf..5c829179 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -10,6 +10,7 @@ local jid = require "util.jid"; local st = require "util.stanza"; local dt = require "util.datetime"; local new_queue = require "util.queue".new; +local filters = require "util.filters"; local function new_pump(output, ...) -- luacheck: ignore 212/self @@ -92,10 +93,22 @@ local function with_timestamp(stanza, from) return stanza; end +local function manage_buffer(stanza, session) + local ctr = session.csi_counter or 0; + if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then + session.conn:resume_writes(); + else + stanza = with_timestamp(stanza, jid.join(session.username, session.host)) + end + session.csi_counter = ctr + 1; + return stanza; +end + module:hook("csi-client-inactive", function (event) local session = event.origin; if session.conn and session.conn and session.conn.pause_writes then session.conn:pause_writes(); + filters.add_filter(session, "stanzas/out", manage_buffer); elseif session.pump then session.pump:pause(); else @@ -122,7 +135,16 @@ module:hook("csi-client-active", function (event) if session.pump then session.pump:resume(); elseif session.conn and session.conn and session.conn.resume_writes then + filters.remove_filter(session, "stanzas/out", manage_buffer); session.conn:resume_writes(); end end); + +module:hook("c2s-ondrain", function (event) + local session = event.session; + if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then + session.csi_counter = 0; + session.conn:pause_writes(); + end +end); -- cgit v1.2.3 From 439a1e0ebb1045d72b0a3c106dc1d122783387e2 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 18:58:53 +0100 Subject: mod_csi_simple: Trigger buffer flush on seeing incoming data I.e. the client sent us something, which means its network / radio is active. --- plugins/mod_csi_simple.lua | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 5c829179..07a8cfb3 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -104,11 +104,17 @@ local function manage_buffer(stanza, session) return stanza; end +local function flush_buffer(data, session) + session.conn:resume_writes(); + return data; +end + module:hook("csi-client-inactive", function (event) local session = event.origin; if session.conn and session.conn and session.conn.pause_writes then session.conn:pause_writes(); filters.add_filter(session, "stanzas/out", manage_buffer); + filters.add_filter(session, "bytes/in", flush_buffer); elseif session.pump then session.pump:pause(); else @@ -136,6 +142,7 @@ module:hook("csi-client-active", function (event) session.pump:resume(); elseif session.conn and session.conn and session.conn.resume_writes then filters.remove_filter(session, "stanzas/out", manage_buffer); + filters.remove_filter(session, "bytes/in", flush_buffer); session.conn:resume_writes(); end end); -- cgit v1.2.3 From 461c69ff4eca9672bac066cb7f14fdc98bdae32b Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 19:02:40 +0100 Subject: mod_csi_simple: Also flush buffer in "pump" mode --- plugins/mod_csi_simple.lua | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 07a8cfb3..ee7e01c9 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -109,9 +109,15 @@ local function flush_buffer(data, session) return data; end +local function flush_pump(data, session) + session.pump:flush(); + return data; +end + module:hook("csi-client-inactive", function (event) local session = event.origin; if session.conn and session.conn and session.conn.pause_writes then + session.log("info", "Native net.server buffer management mode"); session.conn:pause_writes(); filters.add_filter(session, "stanzas/out", manage_buffer); filters.add_filter(session, "bytes/in", flush_buffer); @@ -124,6 +130,7 @@ module:hook("csi-client-inactive", function (event) local pump = new_pump(session.send, queue_size); pump:pause(); session.pump = pump; + filters.add_filter(session, "bytes/in", flush_pump); function session.send(stanza) if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then pump:flush(); -- cgit v1.2.3 From eefe3deafa99c7d227a4e77c179dde796824cb23 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 20:22:01 +0100 Subject: mod_csi_simple: Remove old "pump" queue/buffer method, handled in net.server now --- plugins/mod_csi_simple.lua | 63 ++-------------------------------------------- 1 file changed, 2 insertions(+), 61 deletions(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index ee7e01c9..0fa0d083 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -9,42 +9,8 @@ module:depends"csi" local jid = require "util.jid"; local st = require "util.stanza"; local dt = require "util.datetime"; -local new_queue = require "util.queue".new; local filters = require "util.filters"; -local function new_pump(output, ...) - -- luacheck: ignore 212/self - local q = new_queue(...); - local flush = true; - function q:pause() - flush = false; - end - function q:resume() - flush = true; - return q:flush(); - end - local push = q.push; - function q:push(item) - local ok = push(self, item); - if not ok then - q:flush(); - output(item, self); - elseif flush then - return q:flush(); - end - return true; - end - function q:flush() - local item = self:pop(); - while item do - output(item, self); - item = self:pop(); - end - return true; - end - return q; -end - local queue_size = module:get_option_number("csi_queue_size", 256); module:hook("csi-is-stanza-important", function (event) @@ -109,45 +75,20 @@ local function flush_buffer(data, session) return data; end -local function flush_pump(data, session) - session.pump:flush(); - return data; -end - module:hook("csi-client-inactive", function (event) local session = event.origin; if session.conn and session.conn and session.conn.pause_writes then - session.log("info", "Native net.server buffer management mode"); session.conn:pause_writes(); filters.add_filter(session, "stanzas/out", manage_buffer); filters.add_filter(session, "bytes/in", flush_buffer); - elseif session.pump then - session.pump:pause(); else - local bare_jid = jid.join(session.username, session.host); - local send = session.send; - session._orig_send = send; - local pump = new_pump(session.send, queue_size); - pump:pause(); - session.pump = pump; - filters.add_filter(session, "bytes/in", flush_pump); - function session.send(stanza) - if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then - pump:flush(); - send(stanza); - else - pump:push(with_timestamp(stanza, bare_jid)); - end - return true; - end + session.log("warn", "Session connection does not support write pausing"); end end); module:hook("csi-client-active", function (event) local session = event.origin; - if session.pump then - session.pump:resume(); - elseif session.conn and session.conn and session.conn.resume_writes then + if session.conn and session.conn and session.conn.resume_writes then filters.remove_filter(session, "stanzas/out", manage_buffer); filters.remove_filter(session, "bytes/in", flush_buffer); session.conn:resume_writes(); -- cgit v1.2.3 From 8ad3c66121ddc50c102061c9365411545d660625 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 20:41:25 +0100 Subject: mod_csi_simple: Separate out functions to enable/disable optimizations This allows reusing this logic outside the events. Letting the functions be module globals makes it easier to access from eg the telnet console. --- plugins/mod_csi_simple.lua | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 0fa0d083..14abbc68 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -75,8 +75,7 @@ local function flush_buffer(data, session) return data; end -module:hook("csi-client-inactive", function (event) - local session = event.origin; +function enable_optimizations(session) if session.conn and session.conn and session.conn.pause_writes then session.conn:pause_writes(); filters.add_filter(session, "stanzas/out", manage_buffer); @@ -84,15 +83,24 @@ module:hook("csi-client-inactive", function (event) else session.log("warn", "Session connection does not support write pausing"); end -end); +end -module:hook("csi-client-active", function (event) - local session = event.origin; +function disble_optimizations(session) if session.conn and session.conn and session.conn.resume_writes then filters.remove_filter(session, "stanzas/out", manage_buffer); filters.remove_filter(session, "bytes/in", flush_buffer); session.conn:resume_writes(); end +end + +module:hook("csi-client-inactive", function (event) + local session = event.origin; + enable_optimizations(session); +end); + +module:hook("csi-client-active", function (event) + local session = event.origin; + disble_optimizations(session); end); -- cgit v1.2.3 From 0c520e8a39f380940ad216a228eb0624d85e02bd Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 20:43:15 +0100 Subject: mod_csi_simple: Disable optimizations on unload and re-enable on load --- plugins/mod_csi_simple.lua | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 14abbc68..d0de222e 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -111,3 +111,24 @@ module:hook("c2s-ondrain", function (event) session.conn:pause_writes(); end end); + +function module.load() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + enable_optimizations(session); + end + end + end +end + +function module.unload() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + disble_optimizations(session); + end + end + end +end + -- cgit v1.2.3 From dd0258ff779f4ccc4da11a19c344d054ba7f8430 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 20:53:49 +0100 Subject: mod_csi_simple: Add some debug logging --- plugins/mod_csi_simple.lua | 2 ++ 1 file changed, 2 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index d0de222e..ff1fa86c 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -71,6 +71,7 @@ local function manage_buffer(stanza, session) end local function flush_buffer(data, session) + session.log("debug", "Client sent something, flushing buffer once"); session.conn:resume_writes(); return data; end @@ -109,6 +110,7 @@ module:hook("c2s-ondrain", function (event) if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then session.csi_counter = 0; session.conn:pause_writes(); + session.log("debug", "Buffer flushed, resuming inactive mode"); end end); -- cgit v1.2.3 From ee003734474622524bcf3eea479e64a6c91f2802 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 24 Mar 2019 22:01:36 +0100 Subject: mod_csi_simple: Improve debug logs by mentioing why the buffer gets flushed --- plugins/mod_csi_simple.lua | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index ff1fa86c..2dda1c42 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -61,7 +61,11 @@ end local function manage_buffer(stanza, session) local ctr = session.csi_counter or 0; - if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then + if ctr >= queue_size then + session.log("debug", "Queue size limit hit, flushing buffer"); + session.conn:resume_writes(); + elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then + session.log("debug", "Important stanza, flushing buffer"); session.conn:resume_writes(); else stanza = with_timestamp(stanza, jid.join(session.username, session.host)) -- cgit v1.2.3 From 3179518b6ac887fef63d5ffa83b2020691809299 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Mon, 25 Mar 2019 10:32:39 +0000 Subject: mod_csi_simple: Fix type in function name --- plugins/mod_csi_simple.lua | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 2dda1c42..c79c56fc 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -90,7 +90,7 @@ function enable_optimizations(session) end end -function disble_optimizations(session) +function disable_optimizations(session) if session.conn and session.conn and session.conn.resume_writes then filters.remove_filter(session, "stanzas/out", manage_buffer); filters.remove_filter(session, "bytes/in", flush_buffer); @@ -105,7 +105,7 @@ end); module:hook("csi-client-active", function (event) local session = event.origin; - disble_optimizations(session); + disable_optimizations(session); end); @@ -132,9 +132,8 @@ function module.unload() for _, user_session in pairs(prosody.hosts[module.host].sessions) do for _, session in pairs(user_session.sessions) do if session.state == "inactive" then - disble_optimizations(session); + disable_optimizations(session); end end end end - -- cgit v1.2.3 From 65f01dec0dc42ee9e1997f2dcddeb6375cdc92c8 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Mon, 25 Mar 2019 15:20:28 +0100 Subject: mod_csi_simple: Include queue size in debug messages --- plugins/mod_csi_simple.lua | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index c79c56fc..a9148618 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -62,10 +62,10 @@ end local function manage_buffer(stanza, session) local ctr = session.csi_counter or 0; if ctr >= queue_size then - session.log("debug", "Queue size limit hit, flushing buffer"); + session.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session.csi_counter); session.conn:resume_writes(); elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then - session.log("debug", "Important stanza, flushing buffer"); + session.log("debug", "Important stanza, flushing buffer (queue size is %d)", session.csi_counter); session.conn:resume_writes(); else stanza = with_timestamp(stanza, jid.join(session.username, session.host)) @@ -75,7 +75,7 @@ local function manage_buffer(stanza, session) end local function flush_buffer(data, session) - session.log("debug", "Client sent something, flushing buffer once"); + session.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session.csi_counter); session.conn:resume_writes(); return data; end @@ -112,9 +112,9 @@ end); module:hook("c2s-ondrain", function (event) local session = event.session; if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then - session.csi_counter = 0; session.conn:pause_writes(); - session.log("debug", "Buffer flushed, resuming inactive mode"); + session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); + session.csi_counter = 0; end end); -- cgit v1.2.3 From 2dbd1528bb118e77463db85403d535d307ec050d Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Sun, 26 May 2019 15:04:16 +0200 Subject: mod_csi_simple: Disable optimizations on disconnect (fixes #1358) The connection becomes invalid here, regardless of 3rd party modules that might keep the session alive. --- plugins/mod_csi_simple.lua | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'plugins/mod_csi_simple.lua') diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index a9148618..13002ea8 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -108,6 +108,10 @@ module:hook("csi-client-active", function (event) disable_optimizations(session); end); +module:hook("pre-resource-unbind", function (event) + local session = event.session; + disable_optimizations(session); +end); module:hook("c2s-ondrain", function (event) local session = event.session; -- cgit v1.2.3