aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_csi_simple.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_csi_simple.lua')
-rw-r--r--plugins/mod_csi_simple.lua63
1 files changed, 2 insertions, 61 deletions
diff --git a/plugins/mod_csi_simple.lua b/plugins/mod_csi_simple.lua
index ee7e01c9..0fa0d083 100644
--- a/plugins/mod_csi_simple.lua
+++ b/plugins/mod_csi_simple.lua
@@ -9,42 +9,8 @@ module:depends"csi"
local jid = require "util.jid";
local st = require "util.stanza";
local dt = require "util.datetime";
-local new_queue = require "util.queue".new;
local filters = require "util.filters";
-local function new_pump(output, ...)
- -- luacheck: ignore 212/self
- local q = new_queue(...);
- local flush = true;
- function q:pause()
- flush = false;
- end
- function q:resume()
- flush = true;
- return q:flush();
- end
- local push = q.push;
- function q:push(item)
- local ok = push(self, item);
- if not ok then
- q:flush();
- output(item, self);
- elseif flush then
- return q:flush();
- end
- return true;
- end
- function q:flush()
- local item = self:pop();
- while item do
- output(item, self);
- item = self:pop();
- end
- return true;
- end
- return q;
-end
-
local queue_size = module:get_option_number("csi_queue_size", 256);
module:hook("csi-is-stanza-important", function (event)
@@ -109,45 +75,20 @@ local function flush_buffer(data, session)
return data;
end
-local function flush_pump(data, session)
- session.pump:flush();
- return data;
-end
-
module:hook("csi-client-inactive", function (event)
local session = event.origin;
if session.conn and session.conn and session.conn.pause_writes then
- session.log("info", "Native net.server buffer management mode");
session.conn:pause_writes();
filters.add_filter(session, "stanzas/out", manage_buffer);
filters.add_filter(session, "bytes/in", flush_buffer);
- elseif session.pump then
- session.pump:pause();
else
- local bare_jid = jid.join(session.username, session.host);
- local send = session.send;
- session._orig_send = send;
- local pump = new_pump(session.send, queue_size);
- pump:pause();
- session.pump = pump;
- filters.add_filter(session, "bytes/in", flush_pump);
- function session.send(stanza)
- if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
- pump:flush();
- send(stanza);
- else
- pump:push(with_timestamp(stanza, bare_jid));
- end
- return true;
- end
+ session.log("warn", "Session connection does not support write pausing");
end
end);
module:hook("csi-client-active", function (event)
local session = event.origin;
- if session.pump then
- session.pump:resume();
- elseif session.conn and session.conn and session.conn.resume_writes then
+ if session.conn and session.conn and session.conn.resume_writes then
filters.remove_filter(session, "stanzas/out", manage_buffer);
filters.remove_filter(session, "bytes/in", flush_buffer);
session.conn:resume_writes();