aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_s2s
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_s2s')
-rw-r--r--plugins/mod_s2s/mod_s2s.lua46
1 files changed, 37 insertions, 9 deletions
diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua
index f5be4f27..c0eaea01 100644
--- a/plugins/mod_s2s/mod_s2s.lua
+++ b/plugins/mod_s2s/mod_s2s.lua
@@ -26,6 +26,7 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local uuid_gen = require "util.uuid".generate;
local fire_global_event = prosody.events.fire_event;
+local runner = require "util.async".runner;
local s2sout = module:require("s2sout");
@@ -41,6 +42,8 @@ local measure_connections = module:measure("connections", "counter");
local sessions = module:shared("sessions");
+local runner_callbacks = {};
+
local log = module._log;
--- Handle stanzas to remote domains
@@ -257,11 +260,21 @@ end
--- XMPP stream event handlers
-local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:server" };
+
+function stream_callbacks.handlestanza(session, stanza)
+ stanza = session.filter("stanzas/in", stanza);
+ session.thread:run(stanza);
+end
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
function stream_callbacks.streamopened(session, attr)
+ -- run _streamopened in async context
+ session.thread:run({ attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr)
session.version = tonumber(attr.version) or 0;
-- TODO: Rename session.secure to session.encrypted
@@ -435,14 +448,6 @@ function stream_callbacks.error(session, error, data)
end
end
-local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
-function stream_callbacks.handlestanza(session, stanza)
- stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
-end
-
local listener = {};
--- Session methods
@@ -517,6 +522,15 @@ end
-- Session initialization logic shared by incoming and outgoing
local function initialize_session(session)
local stream = new_xmpp_stream(session, stream_callbacks);
+
+ session.thread = runner(function (stanza)
+ if stanza.name == nil then
+ stream_callbacks._streamopened(session, stanza.attr);
+ else
+ core_process_stanza(session, stanza);
+ end
+ end, runner_callbacks, session);
+
local log = session.log or log;
session.stream = stream;
@@ -580,6 +594,20 @@ local function initialize_session(session)
end);
end
+function runner_callbacks:ready()
+ self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
+ self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+ self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
+ self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+ (self.data.log or log)("error", "Traceback[s2s]: %s", err);
+end
+
function listener.onconnect(conn)
measure_connections(1);
conn:setoption("keepalive", opt_keepalives);