diff options
Diffstat (limited to 'plugins/mod_s2s/mod_s2s.lua')
-rw-r--r-- | plugins/mod_s2s/mod_s2s.lua | 105 |
1 files changed, 78 insertions, 27 deletions
diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua index 0857f08e..aae37b7f 100644 --- a/plugins/mod_s2s/mod_s2s.lua +++ b/plugins/mod_s2s/mod_s2s.lua @@ -14,7 +14,7 @@ local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; local t_insert = table.insert; -local xpcall, traceback = xpcall, debug.traceback; +local traceback = debug.traceback; local add_task = require "util.timer".add_task; local st = require "util.stanza"; @@ -26,6 +26,7 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing; local s2s_destroy_session = require "core.s2smanager".destroy_session; local uuid_gen = require "util.uuid".generate; local fire_global_event = prosody.events.fire_event; +local runner = require "util.async".runner; local s2sout = module:require("s2sout"); @@ -38,17 +39,25 @@ local secure_domains, insecure_domains = local require_encryption = module:get_option_boolean("s2s_require_encryption", false); local measure_connections = module:measure("connections", "amount"); +local measure_ipv6 = module:measure("ipv6", "amount"); local sessions = module:shared("sessions"); +local runner_callbacks = {}; + local log = module._log; module:hook("stats-update", function () local count = 0; - for _ in pairs(sessions) do + local ipv6 = 0; + for _, session in pairs(sessions) do count = count + 1; + if session.ip and session.ip:match(":") then + ipv6 = ipv6 + 1; + end end measure_connections(count); + measure_ipv6(ipv6); end); --- Handle stanzas to remote domains @@ -57,19 +66,22 @@ local bouncy_stanzas = { message = true, presence = true, iq = true }; local function bounce_sendq(session, reason) local sendq = session.sendq; if not sendq then return; end - session.log("info", "Sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host)); + session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", #sendq, session.to_host); local dummy = { type = "s2sin"; - send = function(s) + send = function () (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", traceback()); end; dummy = true; + close = function () + (session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback()); + end; }; for i, data in ipairs(sendq) do local reply = data[2]; if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then reply.attr.type = "error"; - reply:tag("error", {type = "cancel"}) + reply:tag("error", {type = "cancel", by = session.from_host}) :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up(); if reason then reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}) @@ -100,8 +112,16 @@ function route_to_existing_session(event) (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); -- Queue stanza until we are able to send it - if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)}); - else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end + local queued_item = { + tostring(stanza), + stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza); + }; + if host.sendq then + t_insert(host.sendq, queued_item); + else + -- luacheck: ignore 122 + host.sendq = { queued_item }; + end host.log("debug", "stanza [%s] queued ", stanza.name); return true; elseif host.type == "local" or host.type == "component" then @@ -113,7 +133,7 @@ function route_to_existing_session(event) -- FIXME if host.from_host ~= from_host then log("error", "WARNING! This might, possibly, be a bug, but it might not..."); - log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); + log("error", "We are going to send from %s instead of %s", host.from_host, from_host); end if host.sends2s(stanza) then return true; @@ -149,14 +169,14 @@ module:hook("s2s-read-timeout", keepalive, -1); function module.add_host(module) if module:get_option_boolean("disallow_s2s", false) then - module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling"); + module:log("warn", "The 'disallow_s2s' config option is deprecated, please see https://prosody.im/doc/s2s#disabling"); return nil, "This host has disallow_s2s set"; end module:hook("route/remote", route_to_existing_session, -1); module:hook("route/remote", route_to_new_session, -10); module:hook("s2s-authenticated", make_authenticated, -1); module:hook("s2s-read-timeout", keepalive, -1); - module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) + module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) -- luacheck: ignore 212/stanza if session.type == "s2sout" then -- Stream is authenticated and we are seem to be done with feature negotiation, -- so the stream is ready for stanzas. RFC 6120 Section 4.3 @@ -265,11 +285,21 @@ end --- XMPP stream event handlers -local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:server" }; + +function stream_callbacks.handlestanza(session, stanza) + stanza = session.filter("stanzas/in", stanza); + session.thread:run(stanza); +end local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) session.version = tonumber(attr.version) or 0; -- TODO: Rename session.secure to session.encrypted @@ -364,7 +394,7 @@ function stream_callbacks.streamopened(session, attr) end if ( session.type == "s2sin" or session.type == "s2sout" ) or features.tags[1] then - log("debug", "Sending stream features: %s", tostring(features)); + log("debug", "Sending stream features: %s", features); session.sends2s(features); else (session.log or log)("warn", "No stream features to offer, giving up"); @@ -421,7 +451,7 @@ function stream_callbacks.error(session, error, data) session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); session:close("invalid-namespace"); elseif error == "parse-error" then - session.log("debug", "Server-to-server XML parse error: %s", tostring(error)); + session.log("debug", "Server-to-server XML parse error: %s", error); session:close("not-well-formed"); elseif error == "stream-error" then local condition, text = "undefined-condition"; @@ -441,14 +471,6 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end -function stream_callbacks.handlestanza(session, stanza) - stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end -end - local listener = {}; --- Session methods @@ -476,10 +498,13 @@ local function session_close(session, reason, remote_reason) if reason.extra then stanza:add_child(reason.extra); end - log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or session.ip or "(unknown host)", session.type, tostring(stanza)); + log("debug", "Disconnecting %s[%s], <stream:error> is: %s", + session.host or session.ip or "(unknown host)", session.type, stanza); session.sends2s(stanza); elseif reason.name then -- a stanza - log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); + log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", + session.from_host or "(unknown host)", session.to_host or "(unknown host)", + session.type, reason); session.sends2s(reason); end end @@ -488,8 +513,11 @@ local function session_close(session, reason, remote_reason) session.sends2s("</stream:stream>"); function session.sends2s() return false; end + -- luacheck: ignore 422/reason + -- FIXME reason should be managed in a place common to c2s, s2s, bosh, component etc local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason; - session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); + session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), + session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote local conn = session.conn; @@ -508,7 +536,7 @@ local function session_close(session, reason, remote_reason) end end -function session_stream_attrs(session, from, to, attr) +function session_stream_attrs(session, from, to, attr) -- luacheck: ignore 212/session if not from or (hosts[from] and hosts[from].modules.dialback) then attr["xmlns:db"] = 'jabber:server:dialback'; end @@ -523,6 +551,15 @@ end -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); + + session.thread = runner(function (stanza) + if stanza.name == nil then + stream_callbacks._streamopened(session, stanza.attr); + else + core_process_stanza(session, stanza); + end + end, runner_callbacks, session); + local log = session.log or log; session.stream = stream; @@ -567,7 +604,7 @@ local function initialize_session(session) session.close = session_close; local handlestanza = stream_callbacks.handlestanza; - function session.dispatch_stanza(session, stanza) + function session.dispatch_stanza(session, stanza) -- luacheck: ignore 432/session return handlestanza(session, stanza); end @@ -586,6 +623,20 @@ local function initialize_session(session) end); end +function runner_callbacks:ready() + self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[s2s]: %s", err); +end + function listener.onconnect(conn) conn:setoption("keepalive", opt_keepalives); local session = sessions[conn]; @@ -627,7 +678,7 @@ function listener.ondisconnect(conn, err) return; -- Session lives for now end end - (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed")); + (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); s2s_destroy_session(session, err); end end |