diff options
Diffstat (limited to 'plugins/mod_csi_simple.lua')
-rw-r--r-- | plugins/mod_csi_simple.lua | 242 |
1 files changed, 168 insertions, 74 deletions
diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua index 7fb6b41f..d814f083 100644 --- a/plugins/mod_csi_simple.lua +++ b/plugins/mod_csi_simple.lua @@ -1,4 +1,4 @@ --- Copyright (C) 2016-2018 Kim Alvefur +-- Copyright (C) 2016-2020 Kim Alvefur -- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. @@ -9,115 +9,209 @@ module:depends"csi" local jid = require "util.jid"; local st = require "util.stanza"; local dt = require "util.datetime"; -local new_queue = require "util.queue".new; - -local function new_pump(output, ...) - -- luacheck: ignore 212/self - local q = new_queue(...); - local flush = true; - function q:pause() - flush = false; - end - function q:resume() - flush = true; - return q:flush(); - end - local push = q.push; - function q:push(item) - local ok = push(self, item); - if not ok then - q:flush(); - output(item, self); - elseif flush then - return q:flush(); - end - return true; - end - function q:flush() - local item = self:pop(); - while item do - output(item, self); - item = self:pop(); - end - return true; - end - return q; -end +local filters = require "util.filters"; local queue_size = module:get_option_number("csi_queue_size", 256); -module:hook("csi-is-stanza-important", function (event) - local stanza = event.stanza; - if not st.is_stanza(stanza) then - return true; +local important_payloads = module:get_option_set("csi_important_payloads", { }); + +function is_important(stanza) --> boolean, reason: string + if stanza == " " then + return true, "whitespace keepalive"; + elseif type(stanza) == "string" then + return true, "raw data"; + elseif not st.is_stanza(stanza) then + -- This should probably never happen + return true, type(stanza); + end + if stanza.attr.xmlns ~= nil then + -- stream errors, stream management etc + return true, "nonza"; end local st_name = stanza.name; if not st_name then return false; end local st_type = stanza.attr.type; if st_name == "presence" then - if st_type == nil or st_type == "unavailable" then - return false; + if st_type == nil or st_type == "unavailable" or st_type == "error" then + return false, "presence update"; end - return true; + -- TODO Some MUC awareness, e.g. check for the 'this relates to you' status code + return true, "subscription request"; elseif st_name == "message" then if st_type == "headline" then - return false; + -- Headline messages are ephemeral by definition + return false, "headline"; + end + if st_type == "error" then + return true, "delivery failure"; end if stanza:get_child("sent", "urn:xmpp:carbons:2") then - return true; + return true, "carbon"; end local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message"); if forwarded then stanza = forwarded; end if stanza:get_child("body") then - return true; + return true, "body"; end if stanza:get_child("subject") then - return true; + -- Last step of a MUC join + return true, "subject"; end if stanza:get_child("encryption", "urn:xmpp:eme:0") then - return true; + -- Since we can't know what an encrypted message contains, we assume it's important + -- XXX Experimental XEP + return true, "encrypted"; + end + if stanza:get_child("x", "jabber:x:conference") or stanza:find("{http://jabber.org/protocol/muc#user}x/invite") then + return true, "invite"; end if stanza:get_child(nil, "urn:xmpp:jingle-message:0") then - return true; + -- XXX Experimental XEP stuck in Proposed for almost a year at the time of this comment + return true, "jingle call"; + end + for important in important_payloads do + if stanza:find(important) then + return true; + end end return false; + elseif st_name == "iq" then + return true; end - return true; +end + +module:hook("csi-is-stanza-important", function (event) + local important, why = is_important(event.stanza); + event.reason = why; + return important; end, -1); -module:hook("csi-client-inactive", function (event) - local session = event.origin; - if session.pump then - session.pump:pause(); - else - local bare_jid = jid.join(session.username, session.host); - local send = session.send; - session._orig_send = send; - local pump = new_pump(session.send, queue_size); - pump:pause(); - session.pump = pump; - function session.send(stanza) - if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then - pump:flush(); - send(stanza); - else - if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then - stanza = st.clone(stanza); - stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()})); - end - pump:push(stanza); - end - return true; +local function should_flush(stanza, session, ctr) --> boolean, reason: string + if ctr >= queue_size then + return true, "queue size limit reached"; + end + local event = { stanza = stanza, session = session }; + local ret = module:fire_event("csi-is-stanza-important", event) + return ret, event.reason; +end + +local function with_timestamp(stanza, from) + if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then + stanza = st.clone(stanza); + stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); + end + return stanza; +end + +local measure_buffer_hold = module:measure("buffer_hold", "times"); + +local flush_reasons = setmetatable({}, { + __index = function (t, reason) + local m = module:measure("flush_reason."..reason:gsub("%W", "_"), "rate"); + t[reason] = m; + return m; + end; + }); + + +local function manage_buffer(stanza, session) + local ctr = session.csi_counter or 0; + local flush, why = should_flush(stanza, session, ctr); + if flush then + if session.csi_measure_buffer_hold then + session.csi_measure_buffer_hold(); + session.csi_measure_buffer_hold = nil; end + flush_reasons[why or "important"](); + session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter); + session.conn:resume_writes(); + else + session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter); + stanza = with_timestamp(stanza, jid.join(session.username, session.host)) + end + session.csi_counter = ctr + 1; + return stanza; +end + +local function flush_buffer(data, session) + session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter); + flush_reasons["client activity"](); + if session.csi_measure_buffer_hold then + session.csi_measure_buffer_hold(); + session.csi_measure_buffer_hold = nil; + end + session.conn:resume_writes(); + return data; +end + +function enable_optimizations(session) + if session.conn and session.conn.pause_writes then + session.conn:pause_writes(); + session.csi_measure_buffer_hold = measure_buffer_hold(); + session.csi_counter = 0; + filters.add_filter(session, "stanzas/out", manage_buffer); + filters.add_filter(session, "bytes/in", flush_buffer); + else + session.log("warn", "Session connection does not support write pausing"); + end +end + +function disable_optimizations(session) + filters.remove_filter(session, "stanzas/out", manage_buffer); + filters.remove_filter(session, "bytes/in", flush_buffer); + session.csi_counter = nil; + if session.csi_measure_buffer_hold then + session.csi_measure_buffer_hold(); + session.csi_measure_buffer_hold = nil; end + if session.conn and session.conn.resume_writes then + session.conn:resume_writes(); + end +end + +module:hook("csi-client-inactive", function (event) + local session = event.origin; + enable_optimizations(session); end); module:hook("csi-client-active", function (event) local session = event.origin; - if session.pump then - session.pump:resume(); + disable_optimizations(session); +end); + +module:hook("pre-resource-unbind", function (event) + local session = event.session; + disable_optimizations(session); +end, 1); + +module:hook("c2s-ondrain", function (event) + local session = event.session; + if session.state == "inactive" and session.conn and session.conn.pause_writes then + session.conn:pause_writes(); + session.csi_measure_buffer_hold = measure_buffer_hold(); + session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); + session.csi_counter = 0; end end); +function module.load() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + enable_optimizations(session); + end + end + end +end + +function module.unload() + for _, user_session in pairs(prosody.hosts[module.host].sessions) do + for _, session in pairs(user_session.sessions) do + if session.state == "inactive" then + disable_optimizations(session); + end + end + end +end |