1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
|
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local hosts = hosts;
local sessions = sessions;
local core_process_stanza = function(a, b) core_process_stanza(a, b); end
local add_task = require "util.timer".add_task;
local socket = require "socket";
local format = string.format;
local t_insert, t_sort = table.insert, table.sort;
local get_traceback = debug.traceback;
local tostring, pairs, ipairs, getmetatable, newproxy, error, tonumber, setmetatable
= tostring, pairs, ipairs, getmetatable, newproxy, error, tonumber, setmetatable;
local idna_to_ascii = require "util.encodings".idna.to_ascii;
local connlisteners_get = require "net.connlisteners".get;
local initialize_filters = require "util.filters".initialize;
local wrapclient = require "net.server".wrapclient;
local modulemanager = require "core.modulemanager";
local st = require "stanza";
local stanza = st.stanza;
local nameprep = require "util.encodings".stringprep.nameprep;
local fire_event = prosody.events.fire_event;
local uuid_gen = require "util.uuid".generate;
local logger_init = require "util.logger".init;
local log = logger_init("s2smanager");
local sha256_hash = require "util.hashes".sha256;
local adns, dns = require "net.adns", require "net.dns";
local config = require "core.configmanager";
local connect_timeout = config.get("*", "core", "s2s_timeout") or 60;
local dns_timeout = config.get("*", "core", "dns_timeout") or 15;
local max_dns_depth = config.get("*", "core", "dns_max_depth") or 3;
dns.settimeout(dns_timeout);
local prosody = _G.prosody;
incoming_s2s = {};
prosody.incoming_s2s = incoming_s2s;
local incoming_s2s = incoming_s2s;
module "s2smanager"
function compare_srv_priorities(a,b)
return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
end
local bouncy_stanzas = { message = true, presence = true, iq = true };
local function bounce_sendq(session, reason)
local sendq = session.sendq;
if sendq then
session.log("info", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
local dummy = {
type = "s2sin";
send = function(s)
(session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback());
end;
dummy = true;
};
for i, data in ipairs(sendq) do
local reply = data[2];
local xmlns = reply.attr.xmlns;
if not(xmlns) and bouncy_stanzas[reply.name] then
reply.attr.type = "error";
reply:tag("error", {type = "cancel"})
:tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
if reason then
reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):text("Connection failed: "..reason):up();
end
core_process_stanza(dummy, reply);
end
sendq[i] = nil;
end
session.sendq = nil;
end
end
function send_to_host(from_host, to_host, data)
if not hosts[from_host] then
log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
return false;
end
local host = hosts[from_host].s2sout[to_host];
if host then
-- We have a connection to this host already
if host.type == "s2sout_unauthed" and (data.name ~= "db:verify" or not host.dialback_key) then
(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
-- Queue stanza until we are able to send it
if host.sendq then t_insert(host.sendq, {tostring(data), st.reply(data)});
else host.sendq = { {tostring(data), st.reply(data)} }; end
host.log("debug", "stanza [%s] queued ", data.name);
elseif host.type == "local" or host.type == "component" then
log("error", "Trying to send a stanza to ourselves??")
log("error", "Traceback: %s", get_traceback());
log("error", "Stanza: %s", tostring(data));
else
(host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host);
-- FIXME
if host.from_host ~= from_host then
log("error", "WARNING! This might, possibly, be a bug, but it might not...");
log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
end
host.sends2s(data);
host.log("debug", "stanza sent over "..host.type);
end
else
log("debug", "opening a new outgoing connection for this stanza");
local host_session = new_outgoing(from_host, to_host);
-- Store in buffer
host_session.sendq = { {tostring(data), st.reply(data)} };
log("debug", "stanza [%s] queued until connection complete", tostring(data.name));
if (not host_session.connecting) and (not host_session.conn) then
log("warn", "Connection to %s failed already, destroying session...", to_host);
destroy_session(host_session);
end
end
end
local open_sessions = 0;
function new_incoming(conn)
local session = { conn = conn, type = "s2sin_unauthed", direction = "incoming", hosts = {} };
if true then
session.trace = newproxy(true);
getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; end;
end
open_sessions = open_sessions + 1;
local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
session.log = log;
local filter = initialize_filters(session);
session.sends2s = function (t)
log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
if t.name then
t = filter("stanzas/out", t);
end
if t then
t = filter("bytes/out", tostring(t));
if t then
return w(conn, t);
end
end
end
incoming_s2s[session] = true;
add_task(connect_timeout, function ()
if session.conn ~= conn or
session.type == "s2sin" then
return; -- Ok, we're connect[ed|ing]
end
-- Not connected, need to close session and clean up
(session.log or log)("warn", "Destroying incomplete session %s->%s due to inactivity",
session.from_host or "(unknown)", session.to_host or "(unknown)");
session:close("connection-timeout");
end);
return session;
end
function new_outgoing(from_host, to_host, connect)
local host_session = { to_host = to_host, from_host = from_host, host = from_host,
notopen = true, type = "s2sout_unauthed", direction = "outgoing",
open_stream = session_open_stream };
hosts[from_host].s2sout[to_host] = host_session;
local log;
do
local conn_name = "s2sout"..tostring(host_session):match("[a-f0-9]*$");
log = logger_init(conn_name);
host_session.log = log;
end
initialize_filters(host_session);
if connect ~= false then
-- Kick the connection attempting machine into life
attempt_connection(host_session);
end
if not host_session.sends2s then
-- A sends2s which buffers data (until the stream is opened)
-- note that data in this buffer will be sent before the stream is authed
-- and will not be ack'd in any way, successful or otherwise
local buffer;
function host_session.sends2s(data)
if not buffer then
buffer = {};
host_session.send_buffer = buffer;
end
log("debug", "Buffering data on unconnected s2sout to %s", to_host);
buffer[#buffer+1] = data;
log("debug", "Buffered item %d: %s", #buffer, tostring(data));
end
end
return host_session;
end
function attempt_connection(host_session, err)
local from_host, to_host = host_session.from_host, host_session.to_host;
local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
if not connect_host then
return false;
end
if not err then -- This is our first attempt
log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
host_session.connecting = true;
local handle;
handle = adns.lookup(function (answer)
handle = nil;
host_session.connecting = nil;
if answer then
log("debug", to_host.." has SRV records, handling...");
local srv_hosts = {};
host_session.srv_hosts = srv_hosts;
for _, record in ipairs(answer) do
t_insert(srv_hosts, record.srv);
end
t_sort(srv_hosts, compare_srv_priorities);
local srv_choice = srv_hosts[1];
host_session.srv_choice = 1;
if srv_choice then
connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
end
else
log("debug", to_host.." has no SRV records, falling back to A");
end
-- Try with SRV, or just the plain hostname if no SRV
local ok, err = try_connect(host_session, connect_host, connect_port);
if not ok then
if not attempt_connection(host_session, err) then
-- No more attempts will be made
destroy_session(host_session, err);
end
end
end, "_xmpp-server._tcp."..connect_host..".", "SRV");
return true; -- Attempt in progress
elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
host_session.srv_choice = host_session.srv_choice + 1;
local srv_choice = host_session.srv_hosts[host_session.srv_choice];
connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
else
host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
-- We're out of options
return false;
end
if not (connect_host and connect_port) then
-- Likely we couldn't resolve DNS
log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
return false;
end
return try_connect(host_session, connect_host, connect_port);
end
function try_connect(host_session, connect_host, connect_port)
host_session.connecting = true;
local handle;
handle = adns.lookup(function (reply)
handle = nil;
host_session.connecting = nil;
-- COMPAT: This is a compromise for all you CNAME-(ab)users :)
if not (reply and reply[#reply] and reply[#reply].a) then
local count = max_dns_depth;
reply = dns.peek(connect_host, "CNAME", "IN");
while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
count = count - 1;
end
end
-- end of CNAME resolving
if reply and reply[#reply] and reply[#reply].a then
log("debug", "DNS reply for %s gives us %s", connect_host, reply[#reply].a);
return make_connect(host_session, reply[#reply].a, connect_port);
else
log("debug", "DNS lookup failed to get a response for %s", connect_host);
if not attempt_connection(host_session, "name resolution failed") then -- Retry if we can
log("debug", "No other records to try for %s - destroying", host_session.to_host);
destroy_session(host_session, "DNS resolution failed"); -- End of the line, we can't
end
end
end, connect_host, "A", "IN");
return true;
end
function make_connect(host_session, connect_host, connect_port)
(host_session.log or log)("info", "Beginning new connection attempt to %s (%s:%d)", host_session.to_host, connect_host, connect_port);
-- Ok, we're going to try to connect
local from_host, to_host = host_session.from_host, host_session.to_host;
local conn, handler = socket.tcp()
if not conn then
log("warn", "Failed to create outgoing connection, system error: %s", handler);
return false, handler;
end
conn:settimeout(0);
local success, err = conn:connect(connect_host, connect_port);
if not success and err ~= "timeout" then
log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host, connect_port, err);
return false, err;
end
local cl = connlisteners_get("xmppserver");
conn = wrapclient(conn, connect_host, connect_port, cl, cl.default_mode or 1 );
host_session.conn = conn;
local filter = initialize_filters(host_session);
local w, log = conn.write, host_session.log;
host_session.sends2s = function (t)
log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
if t.name then
t = filter("stanzas/out", t);
end
if t then
t = filter("bytes/out", tostring(t));
if t then
return w(conn, tostring(t));
end
end
end
-- Register this outgoing connection so that xmppserver_listener knows about it
-- otherwise it will assume it is a new incoming connection
cl.register_outgoing(conn, host_session);
host_session:open_stream(from_host, to_host);
log("debug", "Connection attempt in progress...");
add_task(connect_timeout, function ()
if host_session.conn ~= conn or
host_session.type == "s2sout" or
host_session.connecting then
return; -- Ok, we're connect[ed|ing]
end
-- Not connected, need to close session and clean up
(host_session.log or log)("warn", "Destroying incomplete session %s->%s due to inactivity",
host_session.from_host or "(unknown)", host_session.to_host or "(unknown)");
host_session:close("connection-timeout");
end);
return true;
end
function session_open_stream(session, from, to)
session.sends2s(st.stanza("stream:stream", {
xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
["xmlns:stream"]='http://etherx.jabber.org/streams',
from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag());
end
function streamopened(session, attr)
local send = session.sends2s;
-- TODO: #29: SASL/TLS on s2s streams
session.version = tonumber(attr.version) or 0;
if session.secure == false then
session.secure = true;
end
if session.direction == "incoming" then
-- Send a reply stream header
session.to_host = attr.to and nameprep(attr.to);
session.from_host = attr.from and nameprep(attr.from);
session.streamid = uuid_gen();
(session.log or log)("debug", "incoming s2s received <stream:stream>");
if session.to_host then
if not hosts[session.to_host] then
-- Attempting to connect to a host we don't serve
session:close({
condition = "host-unknown";
text = "This host does not serve "..session.to_host
});
return;
elseif hosts[session.to_host].disallow_s2s then
-- Attempting to connect to a host that disallows s2s
session:close({
condition = "policy-violation";
text = "Server-to-server communication is not allowed to this host";
});
return;
end
end
send("<?xml version='1.0'?>");
send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag());
if session.version >= 1.0 then
local features = st.stanza("stream:features");
if session.to_host then
hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features });
else
(session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host");
end
log("debug", "Sending stream features: %s", tostring(features));
send(features);
end
elseif session.direction == "outgoing" then
-- If we are just using the connection for verifying dialback keys, we won't try and auth it
if not attr.id then error("stream response did not give us a streamid!!!"); end
session.streamid = attr.id;
-- Send unauthed buffer
-- (stanzas which are fine to send before dialback)
-- Note that this is *not* the stanza queue (which
-- we can only send if auth succeeds) :)
local send_buffer = session.send_buffer;
if send_buffer and #send_buffer > 0 then
log("debug", "Sending s2s send_buffer now...");
for i, data in ipairs(send_buffer) do
session.sends2s(tostring(data));
send_buffer[i] = nil;
end
end
session.send_buffer = nil;
-- If server is pre-1.0, don't wait for features, just do dialback
if session.version < 1.0 then
if not session.dialback_verifying then
log("debug", "Initiating dialback...");
initiate_dialback(session);
else
mark_connected(session);
end
end
end
session.notopen = nil;
end
function streamclosed(session)
(session.log or log)("debug", "Received </stream:stream>");
session:close();
end
function initiate_dialback(session)
-- generate dialback key
session.dialback_key = generate_dialback(session.streamid, session.to_host, session.from_host);
session.sends2s(format("<db:result from='%s' to='%s'>%s</db:result>", session.from_host, session.to_host, session.dialback_key));
session.log("info", "sent dialback key on outgoing s2s stream");
end
function generate_dialback(id, to, from)
return sha256_hash(id..to..from..hosts[from].dialback_secret, true);
end
function verify_dialback(id, to, from, key)
return key == generate_dialback(id, to, from);
end
function make_authenticated(session, host)
if not session.secure then
local local_host = session.direction == "incoming" and session.to_host or session.from_host;
if config.get(local_host, "core", "s2s_require_encryption") then
session:close({
condition = "policy-violation",
text = "Encrypted server-to-server communication is required but was not "
..((session.direction == "outgoing" and "offered") or "used")
});
end
end
if session.type == "s2sout_unauthed" then
session.type = "s2sout";
elseif session.type == "s2sin_unauthed" then
session.type = "s2sin";
if host then
if not session.hosts[host] then session.hosts[host] = {}; end
session.hosts[host].authed = true;
end
elseif session.type == "s2sin" and host then
if not session.hosts[host] then session.hosts[host] = {}; end
session.hosts[host].authed = true;
else
return false;
end
session.log("debug", "connection %s->%s is now authenticated", session.from_host or "(unknown)", session.to_host or "(unknown)");
mark_connected(session);
return true;
end
-- Stream is authorised, and ready for normal stanzas
function mark_connected(session)
local sendq, send = session.sendq, session.sends2s;
local from, to = session.from_host, session.to_host;
session.log("info", session.direction.." s2s connection "..from.."->"..to.." complete");
local send_to_host = send_to_host;
function session.send(data) send_to_host(to, from, data); end
local event_data = { session = session };
if session.type == "s2sout" then
prosody.events.fire_event("s2sout-established", event_data);
hosts[session.from_host].events.fire_event("s2sout-established", event_data);
else
prosody.events.fire_event("s2sin-established", event_data);
hosts[session.to_host].events.fire_event("s2sin-established", event_data);
end
if session.direction == "outgoing" then
if sendq then
session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host);
for i, data in ipairs(sendq) do
send(data[1]);
sendq[i] = nil;
end
session.sendq = nil;
end
session.srv_hosts = nil;
end
end
local resting_session = { -- Resting, not dead
destroyed = true;
type = "s2s_destroyed";
open_stream = function (session)
session.log("debug", "Attempt to open stream on resting session");
end;
close = function (session)
session.log("debug", "Attempt to close already-closed session");
end;
filter = function (type, data) return data; end;
}; resting_session.__index = resting_session;
function retire_session(session)
local log = session.log or log;
for k in pairs(session) do
if k ~= "trace" and k ~= "log" and k ~= "id" then
session[k] = nil;
end
end
function session.send(data) log("debug", "Discarding data sent to resting session: %s", tostring(data)); end
function session.data(data) log("debug", "Discarding data received from resting session: %s", tostring(data)); end
return setmetatable(session, resting_session);
end
function destroy_session(session, reason)
if session.destroyed then return; end
(session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host));
if session.direction == "outgoing" then
hosts[session.from_host].s2sout[session.to_host] = nil;
bounce_sendq(session, reason);
elseif session.direction == "incoming" then
incoming_s2s[session] = nil;
end
local event_data = { session = session, reason = reason };
if session.type == "s2sout" then
prosody.events.fire_event("s2sout-destroyed", event_data);
if hosts[session.from_host] then
hosts[session.from_host].events.fire_event("s2sout-destroyed", event_data);
end
elseif session.type == "s2sin" then
prosody.events.fire_event("s2sin-destroyed", event_data);
if hosts[session.to_host] then
hosts[session.to_host].events.fire_event("s2sin-destroyed", event_data);
end
end
retire_session(session); -- Clean session until it is GC'd
end
return _M;
|