aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2012-01-23 16:28:20 +0000
committerMatthew Wild <mwild1@gmail.com>2012-01-23 16:28:20 +0000
commit68bf2a395659bf458efe9e49e4c8abed90fd084f (patch)
tree1cc1b3ff3a123c9256595e9661e784e84c4cdb57
parent6605de32125051a501e4d8d3bf0c088404160159 (diff)
downloadprosody-68bf2a395659bf458efe9e49e4c8abed90fd084f.tar.gz
prosody-68bf2a395659bf458efe9e49e4c8abed90fd084f.zip
s2smanager, mod_s2s, mod_s2s/s2sout: Split connection handling out of s2smanager into mod_s2s, and further split connection logic for s2sout to a module lib, s2sout.lib.lua
-rw-r--r--core/s2smanager.lua581
-rw-r--r--plugins/s2s/mod_s2s.lua447
-rw-r--r--plugins/s2s/s2sout.lib.lua314
3 files changed, 772 insertions, 570 deletions
diff --git a/core/s2smanager.lua b/core/s2smanager.lua
index f44921c3..e61aaccb 100644
--- a/core/s2smanager.lua
+++ b/core/s2smanager.lua
@@ -9,18 +9,13 @@
local hosts = hosts;
-local sessions = sessions;
local core_process_stanza = function(a, b) core_process_stanza(a, b); end
-local add_task = require "util.timer".add_task;
-local socket = require "socket";
local format = string.format;
local t_insert, t_sort = table.insert, table.sort;
local get_traceback = debug.traceback;
local tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable
= tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable;
-local idna_to_ascii = require "util.encodings".idna.to_ascii;
-local connlisteners_get = require "net.connlisteners".get;
local initialize_filters = require "util.filters".initialize;
local wrapclient = require "net.server".wrapclient;
local st = require "stanza";
@@ -41,11 +36,13 @@ local sha256_hash = require "util.hashes".sha256;
local adns, dns = require "net.adns", require "net.dns";
local config = require "core.configmanager";
-local connect_timeout = config.get("*", "core", "s2s_timeout") or 60;
local dns_timeout = config.get("*", "core", "dns_timeout") or 15;
local max_dns_depth = config.get("*", "core", "dns_max_depth") or 3;
+local cfg_sources = config.get("*", "core", "s2s_interface")
+ or config.get("*", "core", "interface");
local sources;
+--FIXME: s2sout should create its own resolver w/ timeout
dns.settimeout(dns_timeout);
local prosody = _G.prosody;
@@ -55,89 +52,6 @@ local incoming_s2s = incoming_s2s;
module "s2smanager"
-function compare_srv_priorities(a,b)
- return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
-end
-
-local bouncy_stanzas = { message = true, presence = true, iq = true };
-local function bounce_sendq(session, reason)
- local sendq = session.sendq;
- if sendq then
- session.log("info", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
- local dummy = {
- type = "s2sin";
- send = function(s)
- (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback());
- end;
- dummy = true;
- };
- for i, data in ipairs(sendq) do
- local reply = data[2];
- if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
- reply.attr.type = "error";
- reply:tag("error", {type = "cancel"})
- :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
- if reason then
- reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
- :text("Server-to-server connection failed: "..reason):up();
- end
- core_process_stanza(dummy, reply);
- end
- sendq[i] = nil;
- end
- session.sendq = nil;
- end
-end
-
-function send_to_host(from_host, to_host, data)
- if not hosts[from_host] then
- log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
- return false;
- end
- local host = hosts[from_host].s2sout[to_host];
- if host then
- -- We have a connection to this host already
- if host.type == "s2sout_unauthed" and (data.name ~= "db:verify" or not host.dialback_key) then
- (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-
- -- Queue stanza until we are able to send it
- if host.sendq then t_insert(host.sendq, {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)});
- else host.sendq = { {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)} }; end
- host.log("debug", "stanza [%s] queued ", data.name);
- elseif host.type == "local" or host.type == "component" then
- log("error", "Trying to send a stanza to ourselves??")
- log("error", "Traceback: %s", get_traceback());
- log("error", "Stanza: %s", tostring(data));
- return false;
- else
- (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host);
- -- FIXME
- if host.from_host ~= from_host then
- log("error", "WARNING! This might, possibly, be a bug, but it might not...");
- log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
- end
- host.sends2s(data);
- host.log("debug", "stanza sent over "..host.type);
- end
- else
- log("debug", "opening a new outgoing connection for this stanza");
- local host_session = new_outgoing(from_host, to_host);
-
- -- Store in buffer
- host_session.sendq = { {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)} };
- log("debug", "stanza [%s] queued until connection complete", tostring(data.name));
- if (not host_session.connecting) and (not host_session.conn) then
- log("warn", "Connection to %s failed already, destroying session...", to_host);
- if not destroy_session(host_session, "Connection failed") then
- -- Already destroyed, we need to bounce our stanza
- bounce_sendq(host_session, host_session.destruction_reason);
- end
- return false;
- end
- end
- return true;
-end
-
local open_sessions = 0;
function new_incoming(conn)
@@ -147,491 +61,18 @@ function new_incoming(conn)
getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; end;
end
open_sessions = open_sessions + 1;
- local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
- session.log = log;
- local filter = initialize_filters(session);
- session.sends2s = function (t)
- log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
- if t.name then
- t = filter("stanzas/out", t);
- end
- if t then
- t = filter("bytes/out", tostring(t));
- if t then
- return w(conn, t);
- end
- end
- end
+ session.log = logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
incoming_s2s[session] = true;
- add_task(connect_timeout, function ()
- if session.conn ~= conn or
- session.type == "s2sin" then
- return; -- Ok, we're connect[ed|ing]
- end
- -- Not connected, need to close session and clean up
- (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
- session.from_host or "(unknown)", session.to_host or "(unknown)");
- session:close("connection-timeout");
- end);
return session;
end
function new_outgoing(from_host, to_host, connect)
- local host_session = { to_host = to_host, from_host = from_host, host = from_host,
- notopen = true, type = "s2sout_unauthed", direction = "outgoing",
- open_stream = session_open_stream };
-
- hosts[from_host].s2sout[to_host] = host_session;
-
- host_session.close = destroy_session; -- This gets replaced by xmppserver_listener later
-
- local log;
- do
- local conn_name = "s2sout"..tostring(host_session):match("[a-f0-9]*$");
- log = logger_init(conn_name);
- host_session.log = log;
- end
-
- initialize_filters(host_session);
-
- if connect ~= false then
- -- Kick the connection attempting machine into life
- if not attempt_connection(host_session) then
- -- Intentionally not returning here, the
- -- session is needed, connected or not
- destroy_session(host_session);
- end
- end
-
- if not host_session.sends2s then
- -- A sends2s which buffers data (until the stream is opened)
- -- note that data in this buffer will be sent before the stream is authed
- -- and will not be ack'd in any way, successful or otherwise
- local buffer;
- function host_session.sends2s(data)
- if not buffer then
- buffer = {};
- host_session.send_buffer = buffer;
- end
- log("debug", "Buffering data on unconnected s2sout to %s", to_host);
- buffer[#buffer+1] = data;
- log("debug", "Buffered item %d: %s", #buffer, tostring(data));
- end
- end
-
- return host_session;
-end
-
-
-function attempt_connection(host_session, err)
- local from_host, to_host = host_session.from_host, host_session.to_host;
- local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
-
- if not connect_host then
- return false;
- end
-
- if not err then -- This is our first attempt
- log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
- host_session.connecting = true;
- local handle;
- handle = adns.lookup(function (answer)
- handle = nil;
- host_session.connecting = nil;
- if answer then
- log("debug", to_host.." has SRV records, handling...");
- local srv_hosts = {};
- host_session.srv_hosts = srv_hosts;
- for _, record in ipairs(answer) do
- t_insert(srv_hosts, record.srv);
- end
- if #srv_hosts == 1 and srv_hosts[1].target == "." then
- log("debug", to_host.." does not provide a XMPP service");
- destroy_session(host_session, err); -- Nothing to see here
- return;
- end
- t_sort(srv_hosts, compare_srv_priorities);
-
- local srv_choice = srv_hosts[1];
- host_session.srv_choice = 1;
- if srv_choice then
- connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
- log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
- end
- else
- log("debug", to_host.." has no SRV records, falling back to A");
- end
- -- Try with SRV, or just the plain hostname if no SRV
- local ok, err = try_connect(host_session, connect_host, connect_port);
- if not ok then
- if not attempt_connection(host_session, err) then
- -- No more attempts will be made
- destroy_session(host_session, err);
- end
- end
- end, "_xmpp-server._tcp."..connect_host..".", "SRV");
-
- return true; -- Attempt in progress
- elseif host_session.ip_hosts then
- return try_connect(host_session, connect_host, connect_port, err);
- elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
- host_session.srv_choice = host_session.srv_choice + 1;
- local srv_choice = host_session.srv_hosts[host_session.srv_choice];
- connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
- host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
- else
- host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
- -- We're out of options
- return false;
- end
-
- if not (connect_host and connect_port) then
- -- Likely we couldn't resolve DNS
- log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
- return false;
- end
-
- return try_connect(host_session, connect_host, connect_port);
-end
-
-function try_next_ip(host_session)
- host_session.connecting = nil;
- host_session.ip_choice = host_session.ip_choice + 1;
- local ip = host_session.ip_hosts[host_session.ip_choice];
- local ok, err= make_connect(host_session, ip.ip, ip.port);
- if not ok then
- if not attempt_connection(host_session, err or "closed") then
- err = err and (": "..err) or "";
- destroy_session(host_session, "Connection failed"..err);
- end
- end
-end
-
-function try_connect(host_session, connect_host, connect_port, err)
- host_session.connecting = true;
-
- if not err then
- local IPs = {};
- host_session.ip_hosts = IPs;
- local handle4, handle6;
- local has_other = false;
-
- if not sources then
- sources = {};
- local cfg_sources = config.get("*", "core", "interface") or connlisteners_get("xmppserver").default_interface;
- if type(cfg_sources) == "string" then
- cfg_sources = { cfg_sources };
- end
- for i, source in ipairs(cfg_sources) do
- if source == "*" then
- sources[i] = new_ip("0.0.0.0", "IPv4");
- else
- sources[i] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
- end
- end
- end
-
- handle4 = adns.lookup(function (reply, err)
- handle4 = nil;
-
- -- COMPAT: This is a compromise for all you CNAME-(ab)users :)
- if not (reply and reply[#reply] and reply[#reply].a) then
- local count = max_dns_depth;
- reply = dns.peek(connect_host, "CNAME", "IN");
- while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
- log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
- reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
- count = count - 1;
- end
- end
- -- end of CNAME resolving
-
- if reply and reply[#reply] and reply[#reply].a then
- for _, ip in ipairs(reply) do
- log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
- IPs[#IPs+1] = new_ip(ip.a, "IPv4");
- end
- end
-
- if has_other then
- if #IPs > 0 then
- rfc3484_dest(host_session.ip_hosts, sources);
- for i = 1, #IPs do
- IPs[i] = {ip = IPs[i], port = connect_port};
- end
- host_session.ip_choice = 0;
- try_next_ip(host_session);
- else
- log("debug", "DNS lookup failed to get a response for %s", connect_host);
- host_session.ip_hosts = nil;
- if not attempt_connection(host_session, "name resolution failed") then -- Retry if we can
- log("debug", "No other records to try for %s - destroying", host_session.to_host);
- err = err and (": "..err) or "";
- destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
- end
- end
- else
- has_other = true;
- end
- end, connect_host, "A", "IN");
-
- handle6 = adns.lookup(function (reply, err)
- handle6 = nil;
-
- if reply and reply[#reply] and reply[#reply].aaaa then
- for _, ip in ipairs(reply) do
- log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
- IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
- end
- end
-
- if has_other then
- if #IPs > 0 then
- rfc3484_dest(host_session.ip_hosts, sources);
- for i = 1, #IPs do
- IPs[i] = {ip = IPs[i], port = connect_port};
- end
- host_session.ip_choice = 0;
- try_next_ip(host_session);
- else
- log("debug", "DNS lookup failed to get a response for %s", connect_host);
- host_session.ip_hosts = nil;
- if not attempt_connection(host_session, "name resolution failed") then -- Retry if we can
- log("debug", "No other records to try for %s - destroying", host_session.to_host);
- err = err and (": "..err) or "";
- destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
- end
- end
- else
- has_other = true;
- end
- end, connect_host, "AAAA", "IN");
-
- return true;
- elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
- try_next_ip(host_session);
- else
- host_session.ip_hosts = nil;
- if not attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
- log("debug", "No other records to try for %s - destroying", host_session.to_host);
- err = err and (": "..err) or "";
- destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't
- return false;
- end
- end
-
- return true;
-end
-
-function make_connect(host_session, connect_host, connect_port)
- (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
- -- Ok, we're going to try to connect
-
- local from_host, to_host = host_session.from_host, host_session.to_host;
-
- local conn, handler;
- if connect_host.proto == "IPv4" then
- conn, handler = socket.tcp();
- else
- conn, handler = socket.tcp6();
- end
-
- if not conn then
- log("warn", "Failed to create outgoing connection, system error: %s", handler);
- return false, handler;
- end
-
- conn:settimeout(0);
- local success, err = conn:connect(connect_host.addr, connect_port);
- if not success and err ~= "timeout" then
- log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
- return false, err;
- end
-
- local cl = connlisteners_get("xmppserver");
- conn = wrapclient(conn, connect_host.addr, connect_port, cl, cl.default_mode or 1 );
- host_session.conn = conn;
-
- local filter = initialize_filters(host_session);
- local w, log = conn.write, host_session.log;
- host_session.sends2s = function (t)
- log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
- if t.name then
- t = filter("stanzas/out", t);
- end
- if t then
- t = filter("bytes/out", tostring(t));
- if t then
- return w(conn, tostring(t));
- end
- end
- end
-
- -- Register this outgoing connection so that xmppserver_listener knows about it
- -- otherwise it will assume it is a new incoming connection
- cl.register_outgoing(conn, host_session);
-
- host_session:open_stream(from_host, to_host);
-
- log("debug", "Connection attempt in progress...");
- add_task(connect_timeout, function ()
- if host_session.conn ~= conn or
- host_session.type == "s2sout" or
- host_session.connecting then
- return; -- Ok, we're connect[ed|ing]
- end
- -- Not connected, need to close session and clean up
- (host_session.log or log)("warn", "Destroying incomplete session %s->%s due to inactivity",
- host_session.from_host or "(unknown)", host_session.to_host or "(unknown)");
- host_session:close("connection-timeout");
- end);
- return true;
-end
-
-function session_open_stream(session, from, to)
- session.sends2s(st.stanza("stream:stream", {
- xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
- ["xmlns:stream"]='http://etherx.jabber.org/streams',
- from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag());
-end
-
-local function check_cert_status(session)
- local conn = session.conn:socket()
- local cert
- if conn.getpeercertificate then
- cert = conn:getpeercertificate()
- end
-
- if cert then
- local chain_valid, errors = conn:getpeerverification()
- -- Is there any interest in printing out all/the number of errors here?
- if not chain_valid then
- (session.log or log)("debug", "certificate chain validation result: invalid");
- session.cert_chain_status = "invalid";
- else
- (session.log or log)("debug", "certificate chain validation result: valid");
- session.cert_chain_status = "valid";
-
- local host = session.direction == "incoming" and session.from_host or session.to_host
-
- -- We'll go ahead and verify the asserted identity if the
- -- connecting server specified one.
- if host then
- if cert_verify_identity(host, "xmpp-server", cert) then
- session.cert_identity_status = "valid"
- else
- session.cert_identity_status = "invalid"
- end
- end
- end
- end
-end
-
-function streamopened(session, attr)
- local send = session.sends2s;
-
- -- TODO: #29: SASL/TLS on s2s streams
- session.version = tonumber(attr.version) or 0;
-
- -- TODO: Rename session.secure to session.encrypted
- if session.secure == false then
- session.secure = true;
- end
-
- if session.direction == "incoming" then
- -- Send a reply stream header
- session.to_host = attr.to and nameprep(attr.to);
- session.from_host = attr.from and nameprep(attr.from);
-
- session.streamid = uuid_gen();
- (session.log or log)("debug", "Incoming s2s received <stream:stream>");
- if session.to_host then
- if not hosts[session.to_host] then
- -- Attempting to connect to a host we don't serve
- session:close({
- condition = "host-unknown";
- text = "This host does not serve "..session.to_host
- });
- return;
- elseif hosts[session.to_host].disallow_s2s then
- -- Attempting to connect to a host that disallows s2s
- session:close({
- condition = "policy-violation";
- text = "Server-to-server communication is not allowed to this host";
- });
- return;
- end
- end
-
- if session.secure and not session.cert_chain_status then check_cert_status(session); end
-
- send("<?xml version='1.0'?>");
- send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
- ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag());
- if session.version >= 1.0 then
- local features = st.stanza("stream:features");
-
- if session.to_host then
- hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features });
- else
- (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host");
- end
-
- log("debug", "Sending stream features: %s", tostring(features));
- send(features);
- end
- elseif session.direction == "outgoing" then
- -- If we are just using the connection for verifying dialback keys, we won't try and auth it
- if not attr.id then error("stream response did not give us a streamid!!!"); end
- session.streamid = attr.id;
-
- if session.secure and not session.cert_chain_status then check_cert_status(session); end
-
- -- Send unauthed buffer
- -- (stanzas which are fine to send before dialback)
- -- Note that this is *not* the stanza queue (which
- -- we can only send if auth succeeds) :)
- local send_buffer = session.send_buffer;
- if send_buffer and #send_buffer > 0 then
- log("debug", "Sending s2s send_buffer now...");
- for i, data in ipairs(send_buffer) do
- session.sends2s(tostring(data));
- send_buffer[i] = nil;
- end
- end
- session.send_buffer = nil;
-
- -- If server is pre-1.0, don't wait for features, just do dialback
- if session.version < 1.0 then
- if not session.dialback_verifying then
- log("debug", "Initiating dialback...");
- initiate_dialback(session);
- else
- mark_connected(session);
- end
- end
- end
- session.notopen = nil;
-end
-
-function streamclosed(session)
- (session.log or log)("debug", "Received </stream:stream>");
- session:close();
-end
-
-function initiate_dialback(session)
- -- generate dialback key
- session.dialback_key = generate_dialback(session.streamid, session.to_host, session.from_host);
- session.sends2s(format("<db:result from='%s' to='%s'>%s</db:result>", session.from_host, session.to_host, session.dialback_key));
- session.log("info", "sent dialback key on outgoing s2s stream");
-end
-
-function generate_dialback(id, to, from)
- return sha256_hash(id..to..from..hosts[from].dialback_secret, true);
-end
-
-function verify_dialback(id, to, from, key)
- return key == generate_dialback(id, to, from);
+ local host_session = { to_host = to_host, from_host = from_host, host = from_host,
+ notopen = true, type = "s2sout_unauthed", direction = "outgoing" };
+ hosts[from_host].s2sout[to_host] = host_session;
+ local conn_name = "s2sout"..tostring(host_session):match("[a-f0-9]*$");
+ host_session.log = logger_init(conn_name);
+ return host_session;
end
function make_authenticated(session, host)
@@ -733,7 +174,7 @@ function destroy_session(session, reason)
if session.direction == "outgoing" then
hosts[session.from_host].s2sout[session.to_host] = nil;
- bounce_sendq(session, reason);
+ session:bounce_sendq(reason);
elseif session.direction == "incoming" then
incoming_s2s[session] = nil;
end
diff --git a/plugins/s2s/mod_s2s.lua b/plugins/s2s/mod_s2s.lua
new file mode 100644
index 00000000..5be4d84b
--- /dev/null
+++ b/plugins/s2s/mod_s2s.lua
@@ -0,0 +1,447 @@
+-- Prosody IM
+-- Copyright (C) 2008-2010 Matthew Wild
+-- Copyright (C) 2008-2010 Waqas Hussain
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+
+module:set_global();
+
+local tostring, type = tostring, type;
+local xpcall, traceback = xpcall, debug.traceback;
+
+local add_task = require "util.timer".add_task;
+local st = require "util.stanza";
+local initialize_filters = require "util.filters".initialize;
+local new_xmpp_stream = require "util.xmppstream".new;
+local s2s_new_incoming = require "core.s2smanager".new_incoming;
+local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
+local s2s_destroy_session = require "core.s2smanager".destroy_session;
+
+local s2sout = module:require("s2sout");
+
+local connect_timeout = module:get_option_number("s2s_timeout", 60);
+
+local sessions = module:shared("sessions");
+
+--- Handle stanzas to remote domains
+
+local bouncy_stanzas = { message = true, presence = true, iq = true };
+local function bounce_sendq(session, reason)
+ local sendq = session.sendq;
+ if not sendq then return; end
+ session.log("info", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
+ local dummy = {
+ type = "s2sin";
+ send = function(s)
+ (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback());
+ end;
+ dummy = true;
+ };
+ for i, data in ipairs(sendq) do
+ local reply = data[2];
+ if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
+ reply.attr.type = "error";
+ reply:tag("error", {type = "cancel"})
+ :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
+ if reason then
+ reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
+ :text("Server-to-server connection failed: "..reason):up();
+ end
+ core_process_stanza(dummy, reply);
+ end
+ sendq[i] = nil;
+ end
+ session.sendq = nil;
+end
+
+function send_to_host(from_host, to_host, stanza)
+ if not hosts[from_host] then
+ log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
+ return false;
+ end
+ local host = hosts[from_host].s2sout[to_host];
+ if host then
+ -- We have a connection to this host already
+ if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
+ (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
+
+ -- Queue stanza until we are able to send it
+ if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)});
+ else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end
+ host.log("debug", "stanza [%s] queued ", stanza.name);
+ elseif host.type == "local" or host.type == "component" then
+ log("error", "Trying to send a stanza to ourselves??")
+ log("error", "Traceback: %s", get_traceback());
+ log("error", "Stanza: %s", tostring(stanza));
+ return false;
+ else
+ (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host);
+ -- FIXME
+ if host.from_host ~= from_host then
+ log("error", "WARNING! This might, possibly, be a bug, but it might not...");
+ log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
+ end
+ host.sends2s(stanza);
+ host.log("debug", "stanza sent over "..host.type);
+ end
+ else
+ log("debug", "opening a new outgoing connection for this stanza");
+ local host_session = s2s_new_outgoing(from_host, to_host);
+
+ -- Store in buffer
+ host_session.bounce_sendq = bounce_sendq;
+ host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
+ log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
+ if (not host_session.connecting) and (not host_session.conn) then
+ log("warn", "Connection to %s failed already, destroying session...", to_host);
+ if not s2s_destroy_session(host_session, "Connection failed") then
+ -- Already destroyed, we need to bounce our stanza
+ host_session:bounce_sendq(host_session.destruction_reason);
+ end
+ return false;
+ end
+ s2sout.initiate_connection(host_session);
+ end
+ return true;
+end
+
+module:hook("route/remote", function (event)
+ return send_to_host(event.from_host, event.to_host, event.stanza);
+end);
+
+--- Helper to check that a session peer's certificate is valid
+local function check_cert_status(session)
+ local conn = session.conn:socket()
+ local cert
+ if conn.getpeercertificate then
+ cert = conn:getpeercertificate()
+ end
+
+ if cert then
+ local chain_valid, errors = conn:getpeerverification()
+ -- Is there any interest in printing out all/the number of errors here?
+ if not chain_valid then
+ (session.log or log)("debug", "certificate chain validation result: invalid");
+ session.cert_chain_status = "invalid";
+ else
+ (session.log or log)("debug", "certificate chain validation result: valid");
+ session.cert_chain_status = "valid";
+
+ local host = session.direction == "incoming" and session.from_host or session.to_host
+
+ -- We'll go ahead and verify the asserted identity if the
+ -- connecting server specified one.
+ if host then
+ if cert_verify_identity(host, "xmpp-server", cert) then
+ session.cert_identity_status = "valid"
+ else
+ session.cert_identity_status = "invalid"
+ end
+ end
+ end
+ end
+end
+
+--- XMPP stream event handlers
+
+local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza };
+
+local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
+
+function stream_callbacks.streamopened(session, attr)
+ local send = session.sends2s;
+
+ -- TODO: #29: SASL/TLS on s2s streams
+ session.version = tonumber(attr.version) or 0;
+
+ -- TODO: Rename session.secure to session.encrypted
+ if session.secure == false then
+ session.secure = true;
+ end
+
+ if session.direction == "incoming" then
+ -- Send a reply stream header
+ session.to_host = attr.to and nameprep(attr.to);
+ session.from_host = attr.from and nameprep(attr.from);
+
+ session.streamid = uuid_gen();
+ (session.log or log)("debug", "Incoming s2s received <stream:stream>");
+ if session.to_host then
+ if not hosts[session.to_host] then
+ -- Attempting to connect to a host we don't serve
+ session:close({
+ condition = "host-unknown";
+ text = "This host does not serve "..session.to_host
+ });
+ return;
+ elseif hosts[session.to_host].disallow_s2s then
+ -- Attempting to connect to a host that disallows s2s
+ session:close({
+ condition = "policy-violation";
+ text = "Server-to-server communication is not allowed to this host";
+ });
+ return;
+ end
+ end
+
+ if session.secure and not session.cert_chain_status then check_cert_status(session); end
+
+ send("<?xml version='1.0'?>");
+ send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
+ ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag());
+ if session.version >= 1.0 then
+ local features = st.stanza("stream:features");
+
+ if session.to_host then
+ hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features });
+ else
+ (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host");
+ end
+
+ log("debug", "Sending stream features: %s", tostring(features));
+ send(features);
+ end
+ elseif session.direction == "outgoing" then
+ -- If we are just using the connection for verifying dialback keys, we won't try and auth it
+ if not attr.id then error("stream response did not give us a streamid!!!"); end
+ session.streamid = attr.id;
+
+ if session.secure and not session.cert_chain_status then check_cert_status(session); end
+
+ -- Send unauthed buffer
+ -- (stanzas which are fine to send before dialback)
+ -- Note that this is *not* the stanza queue (which
+ -- we can only send if auth succeeds) :)
+ local send_buffer = session.send_buffer;
+ if send_buffer and #send_buffer > 0 then
+ log("debug", "Sending s2s send_buffer now...");
+ for i, data in ipairs(send_buffer) do
+ session.sends2s(tostring(data));
+ send_buffer[i] = nil;
+ end
+ end
+ session.send_buffer = nil;
+
+ -- If server is pre-1.0, don't wait for features, just do dialback
+ if session.version < 1.0 then
+ if not session.dialback_verifying then
+ log("debug", "Initiating dialback...");
+ initiate_dialback(session);
+ else
+ s2s_mark_connected(session);
+ end
+ end
+ end
+ session.notopen = nil;
+end
+
+function stream_callbacks.streamclosed(session)
+ (session.log or log)("debug", "Received </stream:stream>");
+ session:close();
+end
+
+function stream_callbacks.streamdisconnected(session, err)
+ if err and err ~= "closed" then
+ (session.log or log)("debug", "s2s connection attempt failed: %s", err);
+ if s2sout.attempt_connection(session, err) then
+ (session.log or log)("debug", "...so we're going to try another target");
+ return true; -- Session lives for now
+ end
+ end
+ (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed"));
+ sessions[session.conn] = nil;
+ s2s_destroy_session(session, err);
+end
+
+function stream_callbacks.error(session, error, data)
+ if error == "no-stream" then
+ session:close("invalid-namespace");
+ elseif error == "parse-error" then
+ session.log("debug", "Server-to-server XML parse error: %s", tostring(error));
+ session:close("not-well-formed");
+ elseif error == "stream-error" then
+ local condition, text = "undefined-condition";
+ for child in data:children() do
+ if child.attr.xmlns == xmlns_xmpp_streams then
+ if child.name ~= "text" then
+ condition = child.name;
+ else
+ text = child:get_text();
+ end
+ if condition ~= "undefined-condition" and text then
+ break;
+ end
+ end
+ end
+ text = condition .. (text and (" ("..text..")") or "");
+ session.log("info", "Session closed by remote with error: %s", text);
+ session:close(nil, text);
+ end
+end
+
+local function handleerr(err) log("error", "Traceback[s2s]: %s: %s", tostring(err), traceback()); end
+function stream_callbacks.handlestanza(session, stanza)
+ if stanza.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client
+ stanza.attr.xmlns = nil;
+ end
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
+ end
+end
+
+local listener = { default_port = 5269, default_mode = "*a", default_interface = "*" };
+
+--- Session methods
+local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
+local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
+local function session_close(session, reason, remote_reason)
+ local log = session.log or log;
+ if session.conn then
+ if session.notopen then
+ session.sends2s("<?xml version='1.0'?>");
+ session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
+ end
+ if reason then
+ if type(reason) == "string" then -- assume stream error
+ log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
+ session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
+ elseif type(reason) == "table" then
+ if reason.condition then
+ local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up();
+ if reason.text then
+ stanza:tag("text", stream_xmlns_attr):text(reason.text):up();
+ end
+ if reason.extra then
+ stanza:add_child(reason.extra);
+ end
+ log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
+ session.sends2s(stanza);
+ elseif reason.name then -- a stanza
+ log("info", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
+ session.sends2s(reason);
+ end
+ end
+ end
+ session.sends2s("</stream:stream>");
+ if session.notopen or not session.conn:close() then
+ session.conn:close(true); -- Force FIXME: timer?
+ end
+ session.conn:close();
+ listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed");
+ end
+end
+
+-- Session initialization logic shared by incoming and outgoing
+local function initialize_session(session)
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
+ end
+
+ local filter = session.filter;
+ function session.data(data)
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ (session.log or log)("warn", "Received invalid XML: %s", data);
+ (session.log or log)("warn", "Problem was: %s", err);
+ session:close("not-well-formed");
+ end
+ end
+
+ session.close = session_close;
+
+ local handlestanza = stream_callbacks.handlestanza;
+ function session.dispatch_stanza(session, stanza)
+ return handlestanza(session, stanza);
+ end
+
+ local conn = session.conn;
+ add_task(connect_timeout, function ()
+ if session.conn ~= conn or session.connecting
+ or session.type == "s2sin" or session.type == "s2sout" then
+ return; -- Ok, we're connect[ed|ing]
+ end
+ -- Not connected, need to close session and clean up
+ (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
+ session.from_host or "(unknown)", session.to_host or "(unknown)");
+ session:close("connection-timeout");
+ end);
+end
+
+function listener.onconnect(conn)
+ if not sessions[conn] then -- May be an existing outgoing session
+ local session = s2s_new_incoming(conn);
+ sessions[conn] = session;
+ session.log("debug", "Incoming s2s connection");
+
+ local filter = initialize_filters(session);
+ local w = conn.write;
+ session.sends2s = function (t)
+ log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
+ if t.name then
+ t = filter("stanzas/out", t);
+ end
+ if t then
+ t = filter("bytes/out", tostring(t));
+ if t then
+ return w(conn, t);
+ end
+ end
+ end
+
+ initialize_session(session);
+ end
+end
+
+function listener.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
+ end
+end
+
+function listener.onstatus(conn, status)
+ if status == "ssl-handshake-complete" then
+ local session = sessions[conn];
+ if session and session.direction == "outgoing" then
+ local to_host, from_host = session.to_host, session.from_host;
+ session.log("debug", "Sending stream header...");
+ session:open_stream(session.from_host, session.to_host);
+ end
+ end
+end
+
+function listener.ondisconnect(conn, err)
+ local session = sessions[conn];
+ if session then
+ if stream_callbacks.streamdisconnected(session, err) then
+ return; -- Connection lives, for now
+ end
+ end
+ sessions[conn] = nil;
+end
+
+function listener.register_outgoing(conn, session)
+ session.direction = "outgoing";
+ sessions[conn] = session;
+ initialize_session(session);
+end
+
+s2sout.set_listener(listener);
+
+require "core.portmanager".register_service("s2s", {
+ listener = listener;
+ default_port = 5269;
+ encryption = "starttls";
+});
+
diff --git a/plugins/s2s/s2sout.lib.lua b/plugins/s2s/s2sout.lib.lua
new file mode 100644
index 00000000..094126e7
--- /dev/null
+++ b/plugins/s2s/s2sout.lib.lua
@@ -0,0 +1,314 @@
+-- Prosody IM
+-- Copyright (C) 2008-2010 Matthew Wild
+-- Copyright (C) 2008-2010 Waqas Hussain
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+
+--- Module containing all the logic for connecting to a remote server
+
+local initialize_filters = require "util.filters".initialize;
+local idna_to_ascii = require "util.encodings".idna.to_ascii;
+local add_task = require "util.timer".add_task;
+local socket = require "socket";
+
+local s2s_destroy_session = require "core.s2smanager".destroy_session;
+
+local s2sout = {};
+
+local s2s_listener;
+
+function s2sout.set_listener(listener)
+ s2s_listener = listener;
+end
+
+local function compare_srv_priorities(a,b)
+ return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
+end
+
+local function session_open_stream(session, from, to)
+ session.sends2s(st.stanza("stream:stream", {
+ xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
+ ["xmlns:stream"]='http://etherx.jabber.org/streams',
+ from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag());
+end
+
+function s2sout.initiate_connection(host_session)
+ initialize_filters(host_session);
+ session.open_stream = session_open_stream;
+
+ -- Kick the connection attempting machine into life
+ if not s2sout.attempt_connection(host_session) then
+ -- Intentionally not returning here, the
+ -- session is needed, connected or not
+ s2s_destroy_session(host_session);
+ end
+
+ if not host_session.sends2s then
+ -- A sends2s which buffers data (until the stream is opened)
+ -- note that data in this buffer will be sent before the stream is authed
+ -- and will not be ack'd in any way, successful or otherwise
+ local buffer;
+ function host_session.sends2s(data)
+ if not buffer then
+ buffer = {};
+ host_session.send_buffer = buffer;
+ end
+ log("debug", "Buffering data on unconnected s2sout to %s", to_host);
+ buffer[#buffer+1] = data;
+ log("debug", "Buffered item %d: %s", #buffer, tostring(data));
+ end
+ end
+end
+
+function s2sout.attempt_connection(host_session, err)
+ local from_host, to_host = host_session.from_host, host_session.to_host;
+ local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
+
+ if not connect_host then
+ return false;
+ end
+
+ if not err then -- This is our first attempt
+ log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
+ host_session.connecting = true;
+ local handle;
+ handle = adns.lookup(function (answer)
+ handle = nil;
+ host_session.connecting = nil;
+ if answer then
+ log("debug", to_host.." has SRV records, handling...");
+ local srv_hosts = {};
+ host_session.srv_hosts = srv_hosts;
+ for _, record in ipairs(answer) do
+ t_insert(srv_hosts, record.srv);
+ end
+ if #srv_hosts == 1 and srv_hosts[1].target == "." then
+ log("debug", to_host.." does not provide a XMPP service");
+ s2s_destroy_session(host_session, err); -- Nothing to see here
+ return;
+ end
+ t_sort(srv_hosts, compare_srv_priorities);
+
+ local srv_choice = srv_hosts[1];
+ host_session.srv_choice = 1;
+ if srv_choice then
+ connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
+ log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
+ end
+ else
+ log("debug", to_host.." has no SRV records, falling back to A");
+ end
+ -- Try with SRV, or just the plain hostname if no SRV
+ local ok, err = s2sout.try_connect(host_session, connect_host, connect_port);
+ if not ok then
+ if not s2sout.attempt_connection(host_session, err) then
+ -- No more attempts will be made
+ s2s_destroy_session(host_session, err);
+ end
+ end
+ end, "_xmpp-server._tcp."..connect_host..".", "SRV");
+
+ return true; -- Attempt in progress
+ elseif host_session.ip_hosts then
+ return s2sout.try_connect(host_session, connect_host, connect_port, err);
+ elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
+ host_session.srv_choice = host_session.srv_choice + 1;
+ local srv_choice = host_session.srv_hosts[host_session.srv_choice];
+ connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
+ host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
+ else
+ host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
+ -- We're out of options
+ return false;
+ end
+
+ if not (connect_host and connect_port) then
+ -- Likely we couldn't resolve DNS
+ log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
+ return false;
+ end
+
+ return s2sout.try_connect(host_session, connect_host, connect_port);
+end
+
+function s2sout.try_next_ip(host_session)
+ host_session.connecting = nil;
+ host_session.ip_choice = host_session.ip_choice + 1;
+ local ip = host_session.ip_hosts[host_session.ip_choice];
+ local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port);
+ if not ok then
+ if not s2sout.attempt_connection(host_session, err or "closed") then
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "Connection failed"..err);
+ end
+ end
+end
+
+function s2sout.try_connect(host_session, connect_host, connect_port, err)
+ host_session.connecting = true;
+
+ if not err then
+ local IPs = {};
+ host_session.ip_hosts = IPs;
+ local handle4, handle6;
+ local has_other = false;
+
+ if not sources then
+ sources = {};
+ for i, source in ipairs(cfg_sources) do
+ if source == "*" then
+ sources[i] = new_ip("0.0.0.0", "IPv4");
+ else
+ sources[i] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
+ end
+ end
+ end
+
+ handle4 = adns.lookup(function (reply, err)
+ handle4 = nil;
+
+ -- COMPAT: This is a compromise for all you CNAME-(ab)users :)
+ if not (reply and reply[#reply] and reply[#reply].a) then
+ local count = max_dns_depth;
+ reply = dns.peek(connect_host, "CNAME", "IN");
+ while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
+ log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
+ reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
+ count = count - 1;
+ end
+ end
+ -- end of CNAME resolving
+
+ if reply and reply[#reply] and reply[#reply].a then
+ for _, ip in ipairs(reply) do
+ log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
+ IPs[#IPs+1] = new_ip(ip.a, "IPv4");
+ end
+ end
+
+ if has_other then
+ if #IPs > 0 then
+ rfc3484_dest(host_session.ip_hosts, sources);
+ for i = 1, #IPs do
+ IPs[i] = {ip = IPs[i], port = connect_port};
+ end
+ host_session.ip_choice = 0;
+ s2sout.try_next_ip(host_session);
+ else
+ log("debug", "DNS lookup failed to get a response for %s", connect_host);
+ host_session.ip_hosts = nil;
+ if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
+ log("debug", "No other records to try for %s - destroying", host_session.to_host);
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
+ end
+ end
+ else
+ has_other = true;
+ end
+ end, connect_host, "A", "IN");
+
+ handle6 = adns.lookup(function (reply, err)
+ handle6 = nil;
+
+ if reply and reply[#reply] and reply[#reply].aaaa then
+ for _, ip in ipairs(reply) do
+ log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
+ IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
+ end
+ end
+
+ if has_other then
+ if #IPs > 0 then
+ rfc3484_dest(host_session.ip_hosts, sources);
+ for i = 1, #IPs do
+ IPs[i] = {ip = IPs[i], port = connect_port};
+ end
+ host_session.ip_choice = 0;
+ s2sout.try_next_ip(host_session);
+ else
+ log("debug", "DNS lookup failed to get a response for %s", connect_host);
+ host_session.ip_hosts = nil;
+ if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
+ log("debug", "No other records to try for %s - destroying", host_session.to_host);
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
+ end
+ end
+ else
+ has_other = true;
+ end
+ end, connect_host, "AAAA", "IN");
+
+ return true;
+ elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
+ s2sout.try_next_ip(host_session);
+ else
+ host_session.ip_hosts = nil;
+ if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
+ log("debug", "No other records to try for %s - destroying", host_session.to_host);
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't
+ return false;
+ end
+ end
+
+ return true;
+end
+
+function s2sout.make_connect(host_session, connect_host, connect_port)
+ (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
+ -- Ok, we're going to try to connect
+
+ local from_host, to_host = host_session.from_host, host_session.to_host;
+
+ local conn, handler;
+ if connect_host.proto == "IPv4" then
+ conn, handler = socket.tcp();
+ else
+ conn, handler = socket.tcp6();
+ end
+
+ if not conn then
+ log("warn", "Failed to create outgoing connection, system error: %s", handler);
+ return false, handler;
+ end
+
+ conn:settimeout(0);
+ local success, err = conn:connect(connect_host.addr, connect_port);
+ if not success and err ~= "timeout" then
+ log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
+ return false, err;
+ end
+
+ conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a");
+ host_session.conn = conn;
+
+ local filter = initialize_filters(host_session);
+ local w, log = conn.write, host_session.log;
+ host_session.sends2s = function (t)
+ log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
+ if t.name then
+ t = filter("stanzas/out", t);
+ end
+ if t then
+ t = filter("bytes/out", tostring(t));
+ if t then
+ return w(conn, tostring(t));
+ end
+ end
+ end
+
+ -- Register this outgoing connection so that xmppserver_listener knows about it
+ -- otherwise it will assume it is a new incoming connection
+ s2s_listener.register_outgoing(conn, host_session);
+
+ host_session:open_stream(from_host, to_host);
+
+ log("debug", "Connection attempt in progress...");
+ return true;
+end
+
+return s2sout;