aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_s2s
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2012-07-28 01:14:31 +0100
committerMatthew Wild <mwild1@gmail.com>2012-07-28 01:14:31 +0100
commit81bef2c93b53e006cbeebdffc80040af6b4daf68 (patch)
tree2732225257e7b8ade28695dad7c0df3f7cea00b1 /plugins/mod_s2s
parent8eec29745633bd84c2d325d42dbeeb4afd37d516 (diff)
parent47619af14cd24fcb48b4d2b4c4653f72b882087e (diff)
downloadprosody-81bef2c93b53e006cbeebdffc80040af6b4daf68.tar.gz
prosody-81bef2c93b53e006cbeebdffc80040af6b4daf68.zip
Merge with Florob
Diffstat (limited to 'plugins/mod_s2s')
-rw-r--r--plugins/mod_s2s/mod_s2s.lua75
-rw-r--r--plugins/mod_s2s/s2sout.lib.lua135
2 files changed, 109 insertions, 101 deletions
diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua
index f6c20606..321ed0d7 100644
--- a/plugins/mod_s2s/mod_s2s.lua
+++ b/plugins/mod_s2s/mod_s2s.lua
@@ -10,7 +10,7 @@ module:set_global();
local prosody = prosody;
local hosts = prosody.hosts;
-local core_process_stanza = core_process_stanza;
+local core_process_stanza = prosody.core_process_stanza;
local tostring, type = tostring, type;
local t_insert = table.insert;
@@ -30,7 +30,8 @@ local cert_verify_identity = require "util.x509".verify_identity;
local s2sout = module:require("s2sout");
-local connect_timeout = module:get_option_number("s2s_timeout", 60);
+local connect_timeout = module:get_option_number("s2s_timeout", 90);
+local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
local sessions = module:shared("sessions");
@@ -97,9 +98,10 @@ function route_to_existing_session(event)
log("error", "WARNING! This might, possibly, be a bug, but it might not...");
log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
end
- host.sends2s(stanza);
- host.log("debug", "stanza sent over "..host.type);
- return true;
+ if host.sends2s(stanza) then
+ host.log("debug", "stanza sent over %s", host.type);
+ return true;
+ end
end
end
end
@@ -288,19 +290,7 @@ end
function stream_callbacks.streamclosed(session)
(session.log or log)("debug", "Received </stream:stream>");
- session:close();
-end
-
-function stream_callbacks.streamdisconnected(session, err)
- if err and err ~= "closed" and session.direction == "outgoing" and session.notopen then
- (session.log or log)("debug", "s2s connection attempt failed: %s", err);
- if s2sout.attempt_connection(session, err) then
- (session.log or log)("debug", "...so we're going to try another target");
- return true; -- Session lives for now
- end
- end
- (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed"));
- s2s_destroy_session(session, err);
+ session:close(false);
end
function stream_callbacks.error(session, error, data)
@@ -352,9 +342,9 @@ local function session_close(session, reason, remote_reason)
session.sends2s("<?xml version='1.0'?>");
session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
end
- if reason then
+ if reason then -- nil == no err, initiated by us, false == initiated by remote
if type(reason) == "string" then -- assume stream error
- log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
+ log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
elseif type(reason) == "table" then
if reason.condition then
@@ -365,20 +355,35 @@ local function session_close(session, reason, remote_reason)
if reason.extra then
stanza:add_child(reason.extra);
end
- log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
+ log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
session.sends2s(stanza);
elseif reason.name then -- a stanza
- log("info", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
+ log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
session.sends2s(reason);
end
end
end
+
session.sends2s("</stream:stream>");
- if session.notopen or not session.conn:close() then
- session.conn:close(true); -- Force FIXME: timer?
+ function session.sends2s() return false; end
+
+ local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
+ session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
+
+ -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
+ local conn = session.conn;
+ if reason == nil and not session.notopen and session.type == "s2sin" then
+ add_task(stream_close_timeout, function ()
+ if not session.destroyed then
+ session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
+ s2s_destroy_session(session, reason);
+ conn:close();
+ end
+ end);
+ else
+ s2s_destroy_session(session, reason);
+ conn:close(); -- Close immediately, as this is an outgoing connection or is not authed
end
- session.conn:close();
- listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed");
end
end
@@ -413,11 +418,9 @@ local function initialize_session(session)
return handlestanza(session, stanza);
end
- local conn = session.conn;
add_task(connect_timeout, function ()
- if session.conn ~= conn or session.connecting
- or session.type == "s2sin" or session.type == "s2sout" then
- return; -- Ok, we're connect[ed|ing]
+ if session.type == "s2sin" or session.type == "s2sout" then
+ return; -- Ok, we're connected
end
-- Not connected, need to close session and clean up
(session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
@@ -474,11 +477,17 @@ end
function listener.ondisconnect(conn, err)
local session = sessions[conn];
if session then
- if stream_callbacks.streamdisconnected(session, err) then
- return; -- Connection lives, for now
+ if err and session.direction == "outgoing" and session.notopen then
+ (session.log or log)("debug", "s2s connection attempt failed: %s", err);
+ if s2sout.attempt_connection(session, err) then
+ (session.log or log)("debug", "...so we're going to try another target");
+ return; -- Session lives for now
+ end
end
+ (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed"));
+ s2s_destroy_session(session, err);
+ sessions[conn] = nil;
end
- sessions[conn] = nil;
end
function listener.register_outgoing(conn, session)
diff --git a/plugins/mod_s2s/s2sout.lib.lua b/plugins/mod_s2s/s2sout.lib.lua
index 48a49036..17978b39 100644
--- a/plugins/mod_s2s/s2sout.lib.lua
+++ b/plugins/mod_s2s/s2sout.lib.lua
@@ -95,14 +95,14 @@ function s2sout.attempt_connection(host_session, err)
handle = nil;
host_session.connecting = nil;
if answer then
- log("debug", to_host.." has SRV records, handling...");
+ log("debug", "%s has SRV records, handling...", to_host);
local srv_hosts = {};
host_session.srv_hosts = srv_hosts;
for _, record in ipairs(answer) do
t_insert(srv_hosts, record.srv);
end
if #srv_hosts == 1 and srv_hosts[1].target == "." then
- log("debug", to_host.." does not provide a XMPP service");
+ log("debug", "%s does not provide a XMPP service", to_host);
s2s_destroy_session(host_session, err); -- Nothing to see here
return;
end
@@ -115,7 +115,7 @@ function s2sout.attempt_connection(host_session, err)
log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
end
else
- log("debug", to_host.." has no SRV records, falling back to A/AAAA");
+ log("debug", "%s has no SRV records, falling back to A/AAAA", to_host);
end
-- Try with SRV, or just the plain hostname if no SRV
local ok, err = s2sout.try_connect(host_session, connect_host, connect_port);
@@ -170,92 +170,91 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
local IPs = {};
host_session.ip_hosts = IPs;
local handle4, handle6;
- local has_other = false;
+ local have_other_result = not(has_ipv4) or not(has_ipv6) or false;
if has_ipv4 then
- handle4 = adns.lookup(function (reply, err)
- handle4 = nil;
-
- -- COMPAT: This is a compromise for all you CNAME-(ab)users :)
- if not (reply and reply[#reply] and reply[#reply].a) then
- local count = max_dns_depth;
- reply = dns.peek(connect_host, "CNAME", "IN");
- while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
- log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
- reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
- count = count - 1;
+ handle4 = adns.lookup(function (reply, err)
+ handle4 = nil;
+
+ -- COMPAT: This is a compromise for all you CNAME-(ab)users :)
+ if not (reply and reply[#reply] and reply[#reply].a) then
+ local count = max_dns_depth;
+ reply = dns.peek(connect_host, "CNAME", "IN");
+ while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
+ log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
+ reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
+ count = count - 1;
+ end
end
- end
- -- end of CNAME resolving
+ -- end of CNAME resolving
- if reply and reply[#reply] and reply[#reply].a then
- for _, ip in ipairs(reply) do
- log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
- IPs[#IPs+1] = new_ip(ip.a, "IPv4");
+ if reply and reply[#reply] and reply[#reply].a then
+ for _, ip in ipairs(reply) do
+ log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
+ IPs[#IPs+1] = new_ip(ip.a, "IPv4");
+ end
end
- end
- if has_other then
- if #IPs > 0 then
- rfc3484_dest(host_session.ip_hosts, sources);
- for i = 1, #IPs do
- IPs[i] = {ip = IPs[i], port = connect_port};
+ if have_other_result then
+ if #IPs > 0 then
+ rfc3484_dest(host_session.ip_hosts, sources);
+ for i = 1, #IPs do
+ IPs[i] = {ip = IPs[i], port = connect_port};
+ end
+ host_session.ip_choice = 0;
+ s2sout.try_next_ip(host_session);
+ else
+ log("debug", "DNS lookup failed to get a response for %s", connect_host);
+ host_session.ip_hosts = nil;
+ if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
+ log("debug", "No other records to try for %s - destroying", host_session.to_host);
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
+ end
end
- host_session.ip_choice = 0;
- s2sout.try_next_ip(host_session);
else
- log("debug", "DNS lookup failed to get a response for %s", connect_host);
- host_session.ip_hosts = nil;
- if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
- log("debug", "No other records to try for %s - destroying", host_session.to_host);
- err = err and (": "..err) or "";
- s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
- end
+ have_other_result = true;
end
- else
- has_other = true;
- end
- end, connect_host, "A", "IN");
+ end, connect_host, "A", "IN");
else
- has_other = true;
+ have_other_result = true;
end
if has_ipv6 then
- handle6 = adns.lookup(function (reply, err)
- handle6 = nil;
+ handle6 = adns.lookup(function (reply, err)
+ handle6 = nil;
- if reply and reply[#reply] and reply[#reply].aaaa then
- for _, ip in ipairs(reply) do
- log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
- IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
+ if reply and reply[#reply] and reply[#reply].aaaa then
+ for _, ip in ipairs(reply) do
+ log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
+ IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
+ end
end
- end
- if has_other then
- if #IPs > 0 then
- rfc3484_dest(host_session.ip_hosts, sources);
- for i = 1, #IPs do
- IPs[i] = {ip = IPs[i], port = connect_port};
+ if have_other_result then
+ if #IPs > 0 then
+ rfc3484_dest(host_session.ip_hosts, sources);
+ for i = 1, #IPs do
+ IPs[i] = {ip = IPs[i], port = connect_port};
+ end
+ host_session.ip_choice = 0;
+ s2sout.try_next_ip(host_session);
+ else
+ log("debug", "DNS lookup failed to get a response for %s", connect_host);
+ host_session.ip_hosts = nil;
+ if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
+ log("debug", "No other records to try for %s - destroying", host_session.to_host);
+ err = err and (": "..err) or "";
+ s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
+ end
end
- host_session.ip_choice = 0;
- s2sout.try_next_ip(host_session);
else
- log("debug", "DNS lookup failed to get a response for %s", connect_host);
- host_session.ip_hosts = nil;
- if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
- log("debug", "No other records to try for %s - destroying", host_session.to_host);
- err = err and (": "..err) or "";
- s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
- end
+ have_other_result = true;
end
- else
- has_other = true;
- end
- end, connect_host, "AAAA", "IN");
+ end, connect_host, "AAAA", "IN");
else
- has_other = true;
+ have_other_result = true;
end
-
return true;
elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
s2sout.try_next_ip(host_session);