aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2019-03-24 18:33:38 +0100
committerKim Alvefur <zash@zash.se>2019-03-24 18:33:38 +0100
commit643c317b1627da95e839bab0e397d89ab6f6a589 (patch)
tree7daffc7a7fec885e861e06aa5f8d41214a79d62e
parente5885c928a79604dea999d24cf57104150b55898 (diff)
downloadprosody-643c317b1627da95e839bab0e397d89ab6f6a589.tar.gz
prosody-643c317b1627da95e839bab0e397d89ab6f6a589.zip
mod_csi_simple: Count buffered items and flush when it reaches configured limit
In this mode, stanzas have been serialized to strings in the internal net.server buffer, so it is difficult to count them after the fact.
-rw-r--r--plugins/mod_csi_simple.lua22
1 files changed, 22 insertions, 0 deletions
diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua
index 6bd6c0bf..5c829179 100644
--- a/plugins/mod_csi_simple.lua
+++ b/plugins/mod_csi_simple.lua
@@ -10,6 +10,7 @@ local jid = require "util.jid";
local st = require "util.stanza";
local dt = require "util.datetime";
local new_queue = require "util.queue".new;
+local filters = require "util.filters";
local function new_pump(output, ...)
-- luacheck: ignore 212/self
@@ -92,10 +93,22 @@ local function with_timestamp(stanza, from)
return stanza;
end
+local function manage_buffer(stanza, session)
+ local ctr = session.csi_counter or 0;
+ if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
+ session.conn:resume_writes();
+ else
+ stanza = with_timestamp(stanza, jid.join(session.username, session.host))
+ end
+ session.csi_counter = ctr + 1;
+ return stanza;
+end
+
module:hook("csi-client-inactive", function (event)
local session = event.origin;
if session.conn and session.conn and session.conn.pause_writes then
session.conn:pause_writes();
+ filters.add_filter(session, "stanzas/out", manage_buffer);
elseif session.pump then
session.pump:pause();
else
@@ -122,7 +135,16 @@ module:hook("csi-client-active", function (event)
if session.pump then
session.pump:resume();
elseif session.conn and session.conn and session.conn.resume_writes then
+ filters.remove_filter(session, "stanzas/out", manage_buffer);
session.conn:resume_writes();
end
end);
+
+module:hook("c2s-ondrain", function (event)
+ local session = event.session;
+ if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
+ session.csi_counter = 0;
+ session.conn:pause_writes();
+ end
+end);