diff options
Diffstat (limited to 'plugins/mod_csi_simple.lua')
-rw-r--r-- | plugins/mod_csi_simple.lua | 146 |
1 files changed, 87 insertions, 59 deletions
diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index da2dd953..24a2f1ce 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -9,40 +9,7 @@ module:depends"csi" local jid = require "util.jid"; local st = require "util.stanza"; local dt = require "util.datetime"; -local new_queue = require "util.queue".new; - -local function new_pump(output, ...) - -- luacheck: ignore 212/self - local q = new_queue(...); - local flush = true; - function q:pause() - flush = false; - end - function q:resume() - flush = true; - return q:flush(); - end - local push = q.push; - function q:push(item) - local ok = push(self, item); - if not ok then - q:flush(); - output(item, self); - elseif flush then - return q:flush(); - end - return true; - end - function q:flush() - local item = self:pop(); - while item do - output(item, self); - item = self:pop(); - end - return true; - end - return q; -end +local filters = require "util.filters"; local queue_size = module:get_option_number("csi_queue_size", 256); @@ -84,37 +51,98 @@ module:hook("csi-is-stanza-important", function (event) return true; end, -1); -module:hook("csi-client-inactive", function (event) - local session = event.origin; - if session.pump then - session.pump:pause(); +local function with_timestamp(stanza, from) + if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then + stanza = st.clone(stanza); + stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); + end + return stanza; +end + +local function manage_buffer(stanza, session) + local ctr = session.csi_counter or 0; + if ctr >= queue_size then + session.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session.csi_counter); + session.conn:resume_writes(); + elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then + session.log("debug", "Important stanza, flushing buffer (queue size is %d)", session.csi_counter); + session.conn:resume_writes(); else - local bare_jid = jid.join(session.username, session.host); - local send = session.send; - session._orig_send = send; - local pump = new_pump(session.send, queue_size); - pump:pause(); - session.pump = pump; - function session.send(stanza) - if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then - pump:flush(); - send(stanza); - else - if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then - stanza = st.clone(stanza); - stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()})); - end - pump:push(stanza); - end - return true; - end + stanza = with_timestamp(stanza, jid.join(session.username, session.host)) end + session.csi_counter = ctr + 1; + return stanza; +end + +local function flush_buffer(data, session) + if session.csi_flushing then + return data; + end + session.csi_flushing = true; + session.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session.csi_counter); + session.conn:resume_writes(); + return data; +end + +function enable_optimizations(session) + if session.conn and session.conn.pause_writes then + session.conn:pause_writes(); + filters.add_filter(session, "stanzas/out", manage_buffer); + filters.add_filter(session, "bytes/in", flush_buffer); + else + session.log("warn", "Session connection does not support write pausing"); + end +end + +function disable_optimizations(session) + session.csi_flushing = nil; + filters.remove_filter(session, "stanzas/out", manage_buffer); + filters.remove_filter(session, "bytes/in", flush_buffer); + if session.conn and session.conn.resume_writes then + session.conn:resume_writes(); + end +end + +module:hook("csi-client-inactive", function (event) + local session = event.origin; + enable_optimizations(session); end); module:hook("csi-client-active", function (event) local session = event.origin; - if session.pump then - session.pump:resume(); + disable_optimizations(session); +end); + +module:hook("pre-resource-unbind", function (event) + local session = event.session; + disable_optimizations(session); +end); + +module:hook("c2s-ondrain", function (event) + local session = event.session; + if session.state == "inactive" and session.conn and session.conn.pause_writes then + session.conn:pause_writes(); + session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); + session.csi_counter = 0; end end); +function module.load() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + enable_optimizations(session); + end + end + end +end + +function module.unload() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + disable_optimizations(session); + end + end + end +end |