aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_s2s/mod_s2s.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_s2s/mod_s2s.lua')
-rw-r--r--plugins/mod_s2s/mod_s2s.lua301
1 files changed, 180 insertions, 121 deletions
diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua
index aae37b7f..9f7d4d6d 100644
--- a/plugins/mod_s2s/mod_s2s.lua
+++ b/plugins/mod_s2s/mod_s2s.lua
@@ -27,8 +27,10 @@ local s2s_destroy_session = require "core.s2smanager".destroy_session;
local uuid_gen = require "util.uuid".generate;
local fire_global_event = prosody.events.fire_event;
local runner = require "util.async".runner;
-
-local s2sout = module:require("s2sout");
+local connect = require "net.connect".connect;
+local service = require "net.resolvers.service";
+local errors = require "util.error";
+local set = require "util.set";
local connect_timeout = module:get_option_number("s2s_timeout", 90);
local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
@@ -45,8 +47,16 @@ local sessions = module:shared("sessions");
local runner_callbacks = {};
+local listener = {};
+
local log = module._log;
+local s2s_service_options = {
+ default_port = 5269;
+ use_ipv4 = module:get_option_boolean("use_ipv4", true);
+ use_ipv6 = module:get_option_boolean("use_ipv6", true);
+};
+
module:hook("stats-update", function ()
local count = 0;
local ipv6 = 0;
@@ -77,15 +87,28 @@ local function bounce_sendq(session, reason)
(session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback());
end;
};
+ -- FIXME Allow for more specific error conditions
+ -- TODO use util.error ?
+ local error_type = "cancel";
+ local condition = "remote-server-not-found";
+ local reason_text;
+ if session.had_stream then -- set when a stream is opened by the remote
+ error_type, condition = "wait", "remote-server-timeout";
+ end
+ if errors.is_err(reason) then
+ error_type, condition, reason_text = reason.type, reason.condition, reason.text;
+ elseif type(reason) == "string" then
+ reason_text = reason;
+ end
for i, data in ipairs(sendq) do
local reply = data[2];
if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
reply.attr.type = "error";
- reply:tag("error", {type = "cancel", by = session.from_host})
- :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
- if reason then
+ reply:tag("error", {type = error_type, by = session.from_host})
+ :tag(condition, {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
+ if reason_text then
reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
- :text("Server-to-server connection failed: "..reason):up();
+ :text("Server-to-server connection failed: "..reason_text):up();
end
core_process_stanza(dummy, reply);
end
@@ -106,38 +129,33 @@ function route_to_existing_session(event)
return false;
end
local host = hosts[from_host].s2sout[to_host];
- if host then
- -- We have a connection to this host already
- if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
- (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-
- -- Queue stanza until we are able to send it
- local queued_item = {
- tostring(stanza),
- stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza);
- };
- if host.sendq then
- t_insert(host.sendq, queued_item);
- else
- -- luacheck: ignore 122
- host.sendq = { queued_item };
- end
- host.log("debug", "stanza [%s] queued ", stanza.name);
- return true;
- elseif host.type == "local" or host.type == "component" then
- log("error", "Trying to send a stanza to ourselves??")
- log("error", "Traceback: %s", traceback());
- log("error", "Stanza: %s", tostring(stanza));
- return false;
+ if not host then return end
+
+ -- We have a connection to this host already
+ if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
+ (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
+
+ -- Queue stanza until we are able to send it
+ local queued_item = {
+ tostring(stanza),
+ stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza);
+ };
+ if host.sendq then
+ t_insert(host.sendq, queued_item);
else
- -- FIXME
- if host.from_host ~= from_host then
- log("error", "WARNING! This might, possibly, be a bug, but it might not...");
- log("error", "We are going to send from %s instead of %s", host.from_host, from_host);
- end
- if host.sends2s(stanza) then
- return true;
- end
+ -- luacheck: ignore 122
+ host.sendq = { queued_item };
+ end
+ host.log("debug", "stanza [%s] queued ", stanza.name);
+ return true;
+ elseif host.type == "local" or host.type == "component" then
+ log("error", "Trying to send a stanza to ourselves??")
+ log("error", "Traceback: %s", traceback());
+ log("error", "Stanza: %s", stanza);
+ return false;
+ else
+ if host.sends2s(stanza) then
+ return true;
end
end
end
@@ -147,17 +165,13 @@ function route_to_new_session(event)
local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
log("debug", "opening a new outgoing connection for this stanza");
local host_session = s2s_new_outgoing(from_host, to_host);
+ host_session.version = 1;
-- Store in buffer
host_session.bounce_sendq = bounce_sendq;
host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
- log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
- s2sout.initiate_connection(host_session);
- if (not host_session.connecting) and (not host_session.conn) then
- log("warn", "Connection to %s failed already, destroying session...", to_host);
- s2s_destroy_session(host_session, "Connection failed");
- return false;
- end
+ log("debug", "stanza [%s] queued until connection complete", stanza.name);
+ connect(service.new(to_host, "xmpp-server", "tcp", s2s_service_options), listener, nil, { session = host_session });
return true;
end
@@ -182,10 +196,20 @@ function module.add_host(module)
-- so the stream is ready for stanzas. RFC 6120 Section 4.3
mark_connected(session);
return true;
+ elseif require_encryption and not session.secure then
+ session.log("warn", "Encrypted server-to-server communication is required but was not offered by %s", session.to_host);
+ session:close({
+ condition = "policy-violation",
+ text = "Encrypted server-to-server communication is required but was not offered",
+ }, nil, "Could not establish encrypted connection to remote server");
+ return true;
elseif not session.dialback_verifying then
session.log("warn", "No SASL EXTERNAL offer and Dialback doesn't seem to be enabled, giving up");
- session:close();
- return false;
+ session:close({
+ condition = "unsupported-feature",
+ text = "No viable authentication method offered",
+ }, nil, "No viable authentication method offered by remote server");
+ return true;
end
end, -1);
end
@@ -203,7 +227,18 @@ function mark_connected(session)
if session.type == "s2sout" then
fire_global_event("s2sout-established", event_data);
hosts[from].events.fire_event("s2sout-established", event_data);
+
+ if session.incoming then
+ session.send = function(stanza)
+ return hosts[from].events.fire_event("route/remote", { from_host = from, to_host = to, stanza = stanza });
+ end;
+ end
+
else
+ if session.outgoing and not hosts[to].s2sout[from] then
+ session.log("debug", "Setting up to handle route from %s to %s", to, from);
+ hosts[to].s2sout[from] = session; -- luacheck: ignore 122
+ end
local host_session = hosts[to];
session.send = function(stanza)
return host_session.events.fire_event("route/remote", { from_host = to, to_host = from, stanza = stanza });
@@ -223,13 +258,6 @@ function mark_connected(session)
end
session.sendq = nil;
end
-
- if session.resolver then
- session.resolver._resolver:closeall()
- end
- session.resolver = nil;
- session.ip_hosts = nil;
- session.srv_hosts = nil;
end
end
@@ -241,7 +269,7 @@ function make_authenticated(event)
condition = "policy-violation",
text = "Encrypted server-to-server communication is required but was not "
..((session.direction == "outgoing" and "offered") or "used")
- });
+ }, nil, "Could not establish encrypted connection to remote server");
end
end
if hosts[host] then
@@ -251,15 +279,13 @@ function make_authenticated(event)
session.type = "s2sout";
elseif session.type == "s2sin_unauthed" then
session.type = "s2sin";
- if host then
- if not session.hosts[host] then session.hosts[host] = {}; end
- session.hosts[host].authed = true;
- end
- elseif session.type == "s2sin" and host then
+ elseif session.type ~= "s2sin" and session.type ~= "s2sout" then
+ return false;
+ end
+
+ if session.incoming and host then
if not session.hosts[host] then session.hosts[host] = {}; end
session.hosts[host].authed = true;
- else
- return false;
end
session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host);
@@ -296,11 +322,12 @@ local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
function stream_callbacks.streamopened(session, attr)
-- run _streamopened in async context
- session.thread:run({ attr = attr });
+ session.thread:run({ stream = "opened", attr = attr });
end
function stream_callbacks._streamopened(session, attr)
session.version = tonumber(attr.version) or 0;
+ session.had_stream = true; -- Had a stream opened at least once
-- TODO: Rename session.secure to session.encrypted
if session.secure == false then
@@ -314,7 +341,6 @@ function stream_callbacks._streamopened(session, attr)
session.compressed = info.compression;
else
(session.log or log)("info", "Stream encrypted");
- session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
end
end
@@ -322,7 +348,9 @@ function stream_callbacks._streamopened(session, attr)
-- Send a reply stream header
-- Validate to/from
- local to, from = nameprep(attr.to), nameprep(attr.from);
+ local to, from = attr.to, attr.from;
+ if to then to = nameprep(attr.to); end
+ if from then from = nameprep(attr.from); end
if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts)
session:close({ condition = "improper-addressing", text = "Invalid 'to' address" });
return;
@@ -416,20 +444,6 @@ function stream_callbacks._streamopened(session, attr)
end
end
- -- Send unauthed buffer
- -- (stanzas which are fine to send before dialback)
- -- Note that this is *not* the stanza queue (which
- -- we can only send if auth succeeds) :)
- local send_buffer = session.send_buffer;
- if send_buffer and #send_buffer > 0 then
- log("debug", "Sending s2s send_buffer now...");
- for i, data in ipairs(send_buffer) do
- session.sends2s(tostring(data));
- send_buffer[i] = nil;
- end
- end
- session.send_buffer = nil;
-
-- If server is pre-1.0, don't wait for features, just do dialback
if session.version < 1.0 then
if not session.dialback_verifying then
@@ -441,11 +455,16 @@ function stream_callbacks._streamopened(session, attr)
end
end
-function stream_callbacks.streamclosed(session)
+function stream_callbacks._streamclosed(session)
(session.log or log)("debug", "Received </stream:stream>");
session:close(false);
end
+function stream_callbacks.streamclosed(session, attr)
+ -- run _streamclosed in async context
+ session.thread:run({ stream = "closed", attr = attr });
+end
+
function stream_callbacks.error(session, error, data)
if error == "no-stream" then
session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
@@ -471,11 +490,12 @@ function stream_callbacks.error(session, error, data)
end
end
-local listener = {};
-
--- Session methods
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
-local function session_close(session, reason, remote_reason)
+-- reason: stream error to send to the remote server
+-- remote_reason: stream error received from the remote server
+-- bounce_reason: stanza error to pass to bounce_sendq because stream- and stanza errors are different
+local function session_close(session, reason, remote_reason, bounce_reason)
local log = session.log or log;
if session.conn then
if session.notopen then
@@ -486,28 +506,24 @@ local function session_close(session, reason, remote_reason)
end
end
if reason then -- nil == no err, initiated by us, false == initiated by remote
+ local stream_error;
if type(reason) == "string" then -- assume stream error
- log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or session.ip or "(unknown host)", session.type, reason);
- session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
- elseif type(reason) == "table" then
- if reason.condition then
- local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up();
- if reason.text then
- stanza:tag("text", stream_xmlns_attr):text(reason.text):up();
- end
- if reason.extra then
- stanza:add_child(reason.extra);
- end
- log("debug", "Disconnecting %s[%s], <stream:error> is: %s",
- session.host or session.ip or "(unknown host)", session.type, stanza);
- session.sends2s(stanza);
- elseif reason.name then -- a stanza
- log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s",
- session.from_host or "(unknown host)", session.to_host or "(unknown host)",
- session.type, reason);
- session.sends2s(reason);
+ stream_error = st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
+ elseif type(reason) == "table" and not st.is_stanza(reason) then
+ stream_error = st.stanza("stream:error"):tag(reason.condition or "undefined-condition", stream_xmlns_attr):up();
+ if reason.text then
+ stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
+ end
+ if reason.extra then
+ stream_error:add_child(reason.extra);
end
end
+ if st.is_stanza(stream_error) then
+ -- to and from are never unknown on outgoing connections
+ log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s",
+ session.from_host or "(unknown host)" or session.ip, session.to_host or "(unknown host)", session.type, reason);
+ session.sends2s(stream_error);
+ end
end
session.sends2s("</stream:stream>");
@@ -521,16 +537,16 @@ local function session_close(session, reason, remote_reason)
-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
local conn = session.conn;
- if reason == nil and not session.notopen and session.type == "s2sin" then
+ if reason == nil and not session.notopen and session.incoming then
add_task(stream_close_timeout, function ()
if not session.destroyed then
session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
- s2s_destroy_session(session, reason);
+ s2s_destroy_session(session, reason, bounce_reason);
conn:close();
end
end);
else
- s2s_destroy_session(session, reason);
+ s2s_destroy_session(session, reason, bounce_reason);
conn:close(); -- Close immediately, as this is an outgoing connection or is not authed
end
end
@@ -553,10 +569,12 @@ local function initialize_session(session)
local stream = new_xmpp_stream(session, stream_callbacks);
session.thread = runner(function (stanza)
- if stanza.name == nil then
- stream_callbacks._streamopened(session, stanza.attr);
- else
+ if st.is_stanza(stanza) then
core_process_stanza(session, stanza);
+ elseif stanza.stream == "opened" then
+ stream_callbacks._streamopened(session, stanza.attr);
+ elseif stanza.stream == "closed" then
+ stream_callbacks._streamclosed(session, stanza.attr);
end
end, runner_callbacks, session);
@@ -595,9 +613,8 @@ local function initialize_session(session)
if data then
local ok, err = stream:feed(data);
if ok then return; end
- log("warn", "Received invalid XML: %s", data);
- log("warn", "Problem was: %s", err);
- session:close("not-well-formed");
+ log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
+ session:close("not-well-formed", nil, "Received invalid XML from remote server");
end
end
@@ -672,11 +689,20 @@ function listener.ondisconnect(conn, err)
local session = sessions[conn];
if session then
sessions[conn] = nil;
+ (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
+ if session.secure == false and err then
+ -- TODO util.error-ify this
+ err = "Error during negotiation of encrypted connection: "..err;
+ end
+ s2s_destroy_session(session, err);
+ end
+end
+
+function listener.onfail(data, err)
+ local session = data and data.session;
+ if session then
if err and session.direction == "outgoing" and session.notopen then
(session.log or log)("debug", "s2s connection attempt failed: %s", err);
- if s2sout.attempt_connection(session, err) then
- return; -- Session lives for now
- end
end
(session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
s2s_destroy_session(session, err);
@@ -700,6 +726,34 @@ function listener.ondetach(conn)
sessions[conn] = nil;
end
+function listener.onattach(conn, data)
+ local session = data and data.session;
+ if session then
+ session.conn = conn;
+ sessions[conn] = session;
+ initialize_session(session);
+ end
+end
+
+-- Complete the sentence "Your certificate " with what's wrong
+local function friendly_cert_error(session) --> string
+ if session.cert_chain_status == "invalid" then
+ if session.cert_chain_errors then
+ local cert_errors = set.new(session.cert_chain_errors[1]);
+ if cert_errors:contains("certificate has expired") then
+ return "has expired";
+ elseif cert_errors:contains("self signed certificate") then
+ return "is self-signed";
+ end
+ end
+ return "is not trusted"; -- for some other reason
+ elseif session.cert_identity_status == "invalid" then
+ return "is not valid for this name";
+ end
+ -- this should normally be unreachable except if no s2s auth module was loaded
+ return "could not be validated";
+end
+
function check_auth_policy(event)
local host, session = event.host, event.session;
local must_secure = secure_auth;
@@ -711,20 +765,21 @@ function check_auth_policy(event)
end
if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then
- module:log("warn", "Forbidding insecure connection to/from %s", host or session.ip or "(unknown host)");
- if session.direction == "incoming" then
- session:close({ condition = "not-authorized", text = "Your server's certificate is invalid, expired, or not trusted by "..session.to_host });
- else -- Close outgoing connections without warning
- session:close(false);
- end
+ local reason = friendly_cert_error(session);
+ session.log("warn", "Forbidding insecure connection to/from %s because its certificate %s", host or session.ip or "(unknown host)", reason);
+ -- XEP-0178 recommends closing outgoing connections without warning
+ -- but does not give a rationale for this.
+ -- In practice most cases are configuration mistakes or forgotten
+ -- certificate renewals. We think it's better to let the other party
+ -- know about the problem so that they can fix it.
+ session:close({ condition = "not-authorized", text = "Your server's certificate "..reason },
+ nil, "Remote server's certificate "..reason);
return false;
end
end
module:hook("s2s-check-certificate", check_auth_policy, -1);
-s2sout.set_listener(listener);
-
module:hook("server-stopping", function(event)
local reason = event.reason;
for _, session in pairs(sessions) do
@@ -739,7 +794,11 @@ module:provides("net", {
listener = listener;
default_port = 5269;
encryption = "starttls";
+ ssl_config = { -- FIXME This is not used atm, see mod_tls
+ verify = { "peer", "client_once", };
+ };
multiplex = {
+ protocol = "xmpp-server";
pattern = "^<.*:stream.*%sxmlns%s*=%s*(['\"])jabber:server%1.*>";
};
});