aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_s2s
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_s2s')
-rw-r--r--plugins/mod_s2s/mod_s2s.lua105
-rw-r--r--plugins/mod_s2s/s2sout.lib.lua28
2 files changed, 99 insertions, 34 deletions
diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua
index 0857f08e..aae37b7f 100644
--- a/plugins/mod_s2s/mod_s2s.lua
+++ b/plugins/mod_s2s/mod_s2s.lua
@@ -14,7 +14,7 @@ local core_process_stanza = prosody.core_process_stanza;
local tostring, type = tostring, type;
local t_insert = table.insert;
-local xpcall, traceback = xpcall, debug.traceback;
+local traceback = debug.traceback;
local add_task = require "util.timer".add_task;
local st = require "util.stanza";
@@ -26,6 +26,7 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local uuid_gen = require "util.uuid".generate;
local fire_global_event = prosody.events.fire_event;
+local runner = require "util.async".runner;
local s2sout = module:require("s2sout");
@@ -38,17 +39,25 @@ local secure_domains, insecure_domains =
local require_encryption = module:get_option_boolean("s2s_require_encryption", false);
local measure_connections = module:measure("connections", "amount");
+local measure_ipv6 = module:measure("ipv6", "amount");
local sessions = module:shared("sessions");
+local runner_callbacks = {};
+
local log = module._log;
module:hook("stats-update", function ()
local count = 0;
- for _ in pairs(sessions) do
+ local ipv6 = 0;
+ for _, session in pairs(sessions) do
count = count + 1;
+ if session.ip and session.ip:match(":") then
+ ipv6 = ipv6 + 1;
+ end
end
measure_connections(count);
+ measure_ipv6(ipv6);
end);
--- Handle stanzas to remote domains
@@ -57,19 +66,22 @@ local bouncy_stanzas = { message = true, presence = true, iq = true };
local function bounce_sendq(session, reason)
local sendq = session.sendq;
if not sendq then return; end
- session.log("info", "Sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
+ session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", #sendq, session.to_host);
local dummy = {
type = "s2sin";
- send = function(s)
+ send = function ()
(session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", traceback());
end;
dummy = true;
+ close = function ()
+ (session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback());
+ end;
};
for i, data in ipairs(sendq) do
local reply = data[2];
if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
reply.attr.type = "error";
- reply:tag("error", {type = "cancel"})
+ reply:tag("error", {type = "cancel", by = session.from_host})
:tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
if reason then
reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
@@ -100,8 +112,16 @@ function route_to_existing_session(event)
(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-- Queue stanza until we are able to send it
- if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)});
- else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end
+ local queued_item = {
+ tostring(stanza),
+ stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza);
+ };
+ if host.sendq then
+ t_insert(host.sendq, queued_item);
+ else
+ -- luacheck: ignore 122
+ host.sendq = { queued_item };
+ end
host.log("debug", "stanza [%s] queued ", stanza.name);
return true;
elseif host.type == "local" or host.type == "component" then
@@ -113,7 +133,7 @@ function route_to_existing_session(event)
-- FIXME
if host.from_host ~= from_host then
log("error", "WARNING! This might, possibly, be a bug, but it might not...");
- log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
+ log("error", "We are going to send from %s instead of %s", host.from_host, from_host);
end
if host.sends2s(stanza) then
return true;
@@ -149,14 +169,14 @@ module:hook("s2s-read-timeout", keepalive, -1);
function module.add_host(module)
if module:get_option_boolean("disallow_s2s", false) then
- module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling");
+ module:log("warn", "The 'disallow_s2s' config option is deprecated, please see https://prosody.im/doc/s2s#disabling");
return nil, "This host has disallow_s2s set";
end
module:hook("route/remote", route_to_existing_session, -1);
module:hook("route/remote", route_to_new_session, -10);
module:hook("s2s-authenticated", make_authenticated, -1);
module:hook("s2s-read-timeout", keepalive, -1);
- module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza)
+ module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) -- luacheck: ignore 212/stanza
if session.type == "s2sout" then
-- Stream is authenticated and we are seem to be done with feature negotiation,
-- so the stream is ready for stanzas. RFC 6120 Section 4.3
@@ -265,11 +285,21 @@ end
--- XMPP stream event handlers
-local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:server" };
+
+function stream_callbacks.handlestanza(session, stanza)
+ stanza = session.filter("stanzas/in", stanza);
+ session.thread:run(stanza);
+end
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
function stream_callbacks.streamopened(session, attr)
+ -- run _streamopened in async context
+ session.thread:run({ attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr)
session.version = tonumber(attr.version) or 0;
-- TODO: Rename session.secure to session.encrypted
@@ -364,7 +394,7 @@ function stream_callbacks.streamopened(session, attr)
end
if ( session.type == "s2sin" or session.type == "s2sout" ) or features.tags[1] then
- log("debug", "Sending stream features: %s", tostring(features));
+ log("debug", "Sending stream features: %s", features);
session.sends2s(features);
else
(session.log or log)("warn", "No stream features to offer, giving up");
@@ -421,7 +451,7 @@ function stream_callbacks.error(session, error, data)
session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
session:close("invalid-namespace");
elseif error == "parse-error" then
- session.log("debug", "Server-to-server XML parse error: %s", tostring(error));
+ session.log("debug", "Server-to-server XML parse error: %s", error);
session:close("not-well-formed");
elseif error == "stream-error" then
local condition, text = "undefined-condition";
@@ -441,14 +471,6 @@ function stream_callbacks.error(session, error, data)
end
end
-local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
-function stream_callbacks.handlestanza(session, stanza)
- stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
-end
-
local listener = {};
--- Session methods
@@ -476,10 +498,13 @@ local function session_close(session, reason, remote_reason)
if reason.extra then
stanza:add_child(reason.extra);
end
- log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or session.ip or "(unknown host)", session.type, tostring(stanza));
+ log("debug", "Disconnecting %s[%s], <stream:error> is: %s",
+ session.host or session.ip or "(unknown host)", session.type, stanza);
session.sends2s(stanza);
elseif reason.name then -- a stanza
- log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
+ log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s",
+ session.from_host or "(unknown host)", session.to_host or "(unknown host)",
+ session.type, reason);
session.sends2s(reason);
end
end
@@ -488,8 +513,11 @@ local function session_close(session, reason, remote_reason)
session.sends2s("</stream:stream>");
function session.sends2s() return false; end
+ -- luacheck: ignore 422/reason
+ -- FIXME reason should be managed in a place common to c2s, s2s, bosh, component etc
local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
- session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
+ session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper),
+ session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
local conn = session.conn;
@@ -508,7 +536,7 @@ local function session_close(session, reason, remote_reason)
end
end
-function session_stream_attrs(session, from, to, attr)
+function session_stream_attrs(session, from, to, attr) -- luacheck: ignore 212/session
if not from or (hosts[from] and hosts[from].modules.dialback) then
attr["xmlns:db"] = 'jabber:server:dialback';
end
@@ -523,6 +551,15 @@ end
-- Session initialization logic shared by incoming and outgoing
local function initialize_session(session)
local stream = new_xmpp_stream(session, stream_callbacks);
+
+ session.thread = runner(function (stanza)
+ if stanza.name == nil then
+ stream_callbacks._streamopened(session, stanza.attr);
+ else
+ core_process_stanza(session, stanza);
+ end
+ end, runner_callbacks, session);
+
local log = session.log or log;
session.stream = stream;
@@ -567,7 +604,7 @@ local function initialize_session(session)
session.close = session_close;
local handlestanza = stream_callbacks.handlestanza;
- function session.dispatch_stanza(session, stanza)
+ function session.dispatch_stanza(session, stanza) -- luacheck: ignore 432/session
return handlestanza(session, stanza);
end
@@ -586,6 +623,20 @@ local function initialize_session(session)
end);
end
+function runner_callbacks:ready()
+ self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
+ self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+ self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
+ self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+ (self.data.log or log)("error", "Traceback[s2s]: %s", err);
+end
+
function listener.onconnect(conn)
conn:setoption("keepalive", opt_keepalives);
local session = sessions[conn];
@@ -627,7 +678,7 @@ function listener.ondisconnect(conn, err)
return; -- Session lives for now
end
end
- (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed"));
+ (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
s2s_destroy_session(session, err);
end
end
diff --git a/plugins/mod_s2s/s2sout.lib.lua b/plugins/mod_s2s/s2sout.lib.lua
index 122ab6a9..5f765da8 100644
--- a/plugins/mod_s2s/s2sout.lib.lua
+++ b/plugins/mod_s2s/s2sout.lib.lua
@@ -8,6 +8,8 @@
--- Module containing all the logic for connecting to a remote server
+-- luacheck: ignore 432/err
+
local portmanager = require "core.portmanager";
local wrapclient = require "net.server".wrapclient;
local initialize_filters = require "util.filters".initialize;
@@ -16,7 +18,6 @@ local new_ip = require "util.ip".new_ip;
local rfc6724_dest = require "util.rfc6724".destination;
local socket = require "socket";
local adns = require "net.adns";
-local dns = require "net.dns";
local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
local local_addresses = require "util.net".local_addresses;
@@ -30,6 +31,7 @@ local sources = {};
local has_ipv4, has_ipv6;
local dns_timeout = module:get_option_number("dns_timeout", 15);
+local resolvers = module:get_option_set("s2s_dns_resolvers")
local s2sout = {};
@@ -45,11 +47,18 @@ local function compare_srv_priorities(a,b)
end
function s2sout.initiate_connection(host_session)
+ local log = host_session.log or log;
+
initialize_filters(host_session);
host_session.version = 1;
host_session.resolver = adns.resolver();
host_session.resolver._resolver:settimeout(dns_timeout);
+ if resolvers then
+ for resolver in resolvers do
+ host_session.resolver._resolver:addnameserver(resolver);
+ end
+ end
-- Kick the connection attempting machine into life
if not s2sout.attempt_connection(host_session) then
@@ -68,9 +77,9 @@ function s2sout.initiate_connection(host_session)
buffer = {};
host_session.send_buffer = buffer;
end
- log("debug", "Buffering data on unconnected s2sout to %s", tostring(host_session.to_host));
+ log("debug", "Buffering data on unconnected s2sout to %s", host_session.to_host);
buffer[#buffer+1] = data;
- log("debug", "Buffered item %d: %s", #buffer, tostring(data));
+ log("debug", "Buffered item %d: %s", #buffer, data);
end
end
end
@@ -78,6 +87,7 @@ end
function s2sout.attempt_connection(host_session, err)
local to_host = host_session.to_host;
local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
+ local log = host_session.log or log;
if not connect_host then
return false;
@@ -129,16 +139,16 @@ function s2sout.attempt_connection(host_session, err)
host_session.srv_choice = host_session.srv_choice + 1;
local srv_choice = host_session.srv_hosts[host_session.srv_choice];
connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
- host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
+ host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", err, host_session.srv_choice, connect_host, connect_port);
else
- host_session.log("info", "Failed in all attempts to connect to %s", tostring(host_session.to_host));
+ host_session.log("info", "Failed in all attempts to connect to %s", host_session.to_host);
-- We're out of options
return false;
end
if not (connect_host and connect_port) then
-- Likely we couldn't resolve DNS
- log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
+ log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", connect_host, connect_port, to_host);
return false;
end
@@ -160,10 +170,12 @@ end
function s2sout.try_connect(host_session, connect_host, connect_port, err)
host_session.connecting = true;
+ local log = host_session.log or log;
if not err then
local IPs = {};
host_session.ip_hosts = IPs;
+ -- luacheck: ignore 231/handle4 231/handle6
local handle4, handle6;
local have_other_result = not(has_ipv4) or not(has_ipv6) or false;
@@ -246,6 +258,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
s2sout.try_next_ip(host_session);
else
+ log("debug", "Out of IP addresses, trying next SRV record (if any)");
host_session.ip_hosts = nil;
if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
log("debug", "No other records to try for %s - destroying", host_session.to_host);
@@ -259,7 +272,8 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
end
function s2sout.make_connect(host_session, connect_host, connect_port)
- (host_session.log or log)("debug", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
+ local log = host_session.log or log;
+ log("debug", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
-- Reset secure flag in case this is another
-- connection attempt after a failed STARTTLS