diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/componentmanager.lua | 3 | ||||
-rw-r--r-- | core/offlinemanager.lua | 32 | ||||
-rw-r--r-- | core/rostermanager.lua | 8 | ||||
-rw-r--r-- | core/s2smanager.lua | 75 | ||||
-rw-r--r-- | core/stanza_router.lua | 36 |
5 files changed, 112 insertions, 42 deletions
diff --git a/core/componentmanager.lua b/core/componentmanager.lua index 38718882..5b655435 100644 --- a/core/componentmanager.lua +++ b/core/componentmanager.lua @@ -25,8 +25,9 @@ function register_component(host, component) if not hosts[host] then
-- TODO check for host well-formedness
components[host] = component;
- hosts[host] = {type = "component", host = host, connected = true};
+ hosts[host] = {type = "component", host = host, connected = true, s2sout = {} };
log("debug", "component added: "..host);
+ return hosts[host];
else
log("error", "Attempt to set component for existing host: "..host);
end
diff --git a/core/offlinemanager.lua b/core/offlinemanager.lua new file mode 100644 index 00000000..e71da446 --- /dev/null +++ b/core/offlinemanager.lua @@ -0,0 +1,32 @@ +
+local datamanager = require "util.datamanager";
+local st = require "util.stanza";
+local datetime = require "util.datetime";
+local ipairs = ipairs;
+
+module "offlinemanager"
+
+function store(node, host, stanza)
+ stanza.attr.stamp = datetime.datetime();
+ stanza.attr.stamp_legacy = datetime.legacy();
+ return datamanager.list_append(node, host, "offline", st.preserialize(stanza));
+end
+
+function load(node, host)
+ local data = datamanager.list_load(node, host, "offline");
+ if not data then return; end
+ for k, v in ipairs(data) do
+ local stanza = st.deserialize(v);
+ stanza:tag("delay", {xmlns = "urn:xmpp:delay", from = host, stamp = stanza.attr.stamp}):up(); -- XEP-0203
+ stanza:tag("x", {xmlns = "jabber:x:delay", from = host, stamp = stanza.attr.stamp_legacy}):up(); -- XEP-0091 (deprecated)
+ stanza.attr.stamp, stanza.attr.stamp_legacy = nil, nil;
+ data[k] = stanza;
+ end
+ return data;
+end
+
+function deleteAll(node, host)
+ return datamanager.list_store(node, host, "offline", nil);
+end
+
+return _M;
diff --git a/core/rostermanager.lua b/core/rostermanager.lua index 9a203337..a08f989d 100644 --- a/core/rostermanager.lua +++ b/core/rostermanager.lua @@ -1,8 +1,6 @@ -local mainlog = log; -local function log(type, message) - mainlog(type, "rostermanager", message); -end + +local log = require "util.logger".init("rostermanager"); local setmetatable = setmetatable; local format = string.format; @@ -234,7 +232,7 @@ function unsubscribed(username, host, jid) if item.subscription == "from" then item.subscription = "none"; changed = true; - elseif item.subscription == both then + elseif item.subscription == "both" then item.subscription = "to"; changed = true; end diff --git a/core/s2smanager.lua b/core/s2smanager.lua index aed10753..c3d9bdb4 100644 --- a/core/s2smanager.lua +++ b/core/s2smanager.lua @@ -4,12 +4,15 @@ local sessions = sessions; local socket = require "socket"; local format = string.format; local t_insert = table.insert; +local get_traceback = debug.traceback; local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber = tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber; local connlisteners_get = require "net.connlisteners".get; local wraptlsclient = require "net.server".wraptlsclient; local modulemanager = require "core.modulemanager"; +local st = require "stanza"; +local stanza = st.stanza; local uuid_gen = require "util.uuid".generate; @@ -21,28 +24,37 @@ local md5_hash = require "util.hashes".md5; local dialback_secret = "This is very secret!!! Ha!"; -local srvmap = { ["gmail.com"] = "talk.google.com", ["identi.ca"] = "longlance.controlezvous.ca", ["cdr.se"] = "jabber.cdr.se" }; +local srvmap = { ["gmail.com"] = "talk.google.com", ["identi.ca"] = "hampton.controlezvous.ca", ["cdr.se"] = "jabber.cdr.se" }; module "s2smanager" -function connect_host(from_host, to_host) -end - function send_to_host(from_host, to_host, data) - local host = hosts[to_host]; + local host = hosts[from_host].s2sout[to_host]; if host then - -- Write to connection - if host.type == "s2sout_unauthed" and not host.notopen and not host.dialback_key then - log("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now..."); - initiate_dialback(host); - if not host.sendq then host.sendq = { data }; - else t_insert(host.sendq, data); end + -- We have a connection to this host already + if host.type == "s2sout_unauthed" then + host.log("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now..."); + if not host.notopen and not host.dialback_key then + host.log("debug", "dialback had not been initiated"); + initiate_dialback(host); + end + + -- Queue stanza until we are able to send it + if host.sendq then t_insert(host.sendq, data); + else host.sendq = { data }; end + elseif host.type == "local" or host.type == "component" then + log("error", "Trying to send a stanza to ourselves??") + log("error", "Traceback: %s", get_traceback()); + log("error", "Stanza: %s", tostring(data)); else - log("debug", "going to send stanza to "..to_host.." from "..from_host); + (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host); -- FIXME - if hosts[to_host].from_host ~= from_host then log("error", "WARNING! This might, possibly, be a bug, but it might not..."); end - hosts[to_host].sends2s(data); - log("debug", "stanza sent over "..hosts[to_host].type); + if host.from_host ~= from_host then + log("error", "WARNING! This might, possibly, be a bug, but it might not..."); + log("error", "We are going to send from %s instead of %s", host.from_host, from_host); + end + host.sends2s(data); + host.log("debug", "stanza sent over "..host.type); end else log("debug", "opening a new outgoing connection for this stanza"); @@ -52,10 +64,6 @@ function send_to_host(from_host, to_host, data) end end -function disconnect_host(host) - -end - local open_sessions = 0; function new_incoming(conn) @@ -72,23 +80,27 @@ end function new_outgoing(from_host, to_host) local host_session = { to_host = to_host, from_host = from_host, notopen = true, type = "s2sout_unauthed", direction = "outgoing" }; - hosts[to_host] = host_session; + hosts[from_host].s2sout[to_host] = host_session; local cl = connlisteners_get("xmppserver"); local conn, handler = socket.tcp() - - -- Register this outgoing connection so that xmppserver_listener knows about it - -- otherwise it will assume it is a new incoming connection - cl.register_outgoing(conn, host_session); - --FIXME: Below parameters (ports/ip) are incorrect (use SRV) to_host = srvmap[to_host] or to_host; - conn:settimeout(0.1); - conn:connect(to_host, 5269); + + conn:settimeout(0); + local success, err = conn:connect(to_host, 5269); + if not success then + log("warn", "s2s connect() failed: %s", err); + end + conn = wraptlsclient(cl, conn, to_host, 5269, 0, 1, hosts[from_host].ssl_ctx ); host_session.conn = conn; + -- Register this outgoing connection so that xmppserver_listener knows about it + -- otherwise it will assume it is a new incoming connection + cl.register_outgoing(conn, host_session); + do local conn_name = "s2sout"..tostring(conn):match("[a-f0-9]*$"); host_session.log = logger_init(conn_name); @@ -103,7 +115,6 @@ function new_outgoing(from_host, to_host) end function streamopened(session, attr) - session.log("debug", "s2s stream opened"); local send = session.sends2s; session.version = tonumber(attr.version) or 0; @@ -124,7 +135,7 @@ function streamopened(session, attr) session.streamid = uuid_gen(); print(session, session.from_host, "incoming s2s stream opened"); send("<?xml version='1.0'?>"); - send(format("<stream:stream xmlns='jabber:server' xmlns:db='jabber:server:dialback' xmlns:stream='http://etherx.jabber.org/streams' id='%s' from='%s'>", session.streamid, session.to_host)); + send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host }):top_tag()); elseif session.direction == "outgoing" then -- If we are just using the connection for verifying dialback keys, we won't try and auth it if not attr.id then error("stream response did not give us a streamid!!!"); end @@ -147,7 +158,7 @@ function streamopened(session, attr) end send("</stream:features>");]] - log("info", "s2s stream opened successfully"); + session.notopen = nil; end @@ -194,7 +205,7 @@ function mark_connected(session) if session.direction == "outgoing" then if sendq then - session.log("debug", "sending queued stanzas across new outgoing connection to "..session.to_host); + session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host); for i, data in ipairs(sendq) do send(data); sendq[i] = nil; @@ -207,7 +218,7 @@ end function destroy_session(session) (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); if session.direction == "outgoing" then - hosts[session.to_host] = nil; + hosts[session.from_host].s2sout[session.to_host] = nil; end session.conn = nil; session.disconnect = nil; diff --git a/core/stanza_router.lua b/core/stanza_router.lua index 9ae98f1c..c1819651 100644 --- a/core/stanza_router.lua +++ b/core/stanza_router.lua @@ -13,6 +13,7 @@ local user_exists = require "core.usermanager".user_exists; local rostermanager = require "core.rostermanager"; local sessionmanager = require "core.sessionmanager"; +local offlinemanager = require "core.offlinemanager"; local s2s_verify_dialback = require "core.s2smanager".verify_dialback; local s2s_make_authenticated = require "core.s2smanager".make_authenticated; @@ -31,7 +32,7 @@ local jid_split = require "util.jid".split; local print = print; function core_process_stanza(origin, stanza) - log("debug", "Received["..origin.type.."]: "..tostring(st.reply(st.reply(stanza)))) + log("debug", "Received[%s]: %s", origin.type, stanza:pretty_top_tag()) if not stanza.attr.xmlns then stanza.attr.xmlns = "jabber:client"; end -- FIXME Hack. This should be removed when we fix namespace handling. -- TODO verify validity of stanza (as well as JID validity) @@ -149,6 +150,10 @@ function core_handle_stanza(origin, stanza) core_route_stanza(origin, request); end end + for _, msg in ipairs(offlinemanager.load(node, host) or {}) do + origin.send(msg); -- FIXME do we need to modify to/from in any way? + end + offlinemanager.deleteAll(node, host); end origin.priority = 0; if stanza.attr.type == "unavailable" then @@ -168,11 +173,23 @@ function core_handle_stanza(origin, stanza) end stanza.attr.to = nil; -- reset it else - -- TODO error, bad type + log("warn", "Unhandled c2s presence: %s", tostring(stanza)); + if stanza.attr.type ~= "error" then + origin.send(st.error_reply(stanza, "cancel", "service-unavailable")); -- FIXME correct error? + end + end + else + log("warn", "Unhandled c2s stanza: %s", tostring(stanza)); + if stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then + origin.send(st.error_reply(stanza, "cancel", "service-unavailable")); -- FIXME correct error? end end -- TODO handle other stanzas else - log("warn", "Unhandled origin: %s", origin.type); -- FIXME reply with error + log("warn", "Unhandled origin: %s", origin.type); + if stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then + -- s2s stanzas can get here + (origin.sends2s or origin.send)(st.error_reply(stanza, "cancel", "service-unavailable")); -- FIXME correct error? + end end end @@ -328,8 +345,14 @@ function core_route_stanza(origin, stanza) t_insert(recipients, session); end end + local count = 0; for _, session in pairs(recipients) do session.send(stanza); + count = count + 1; + end + if count == 0 then + offlinemanager.store(node, host, stanza); + -- TODO deal with storage errors end else -- TODO send IQ error @@ -349,7 +372,12 @@ function core_route_stanza(origin, stanza) -- TODO send unavailable presence or unsubscribed end elseif stanza.name == "message" then - -- TODO send message error, or store offline messages + if stanza.attr.type == "chat" or stanza.attr.type == "normal" or not stanza.attr.type then + offlinemanager.store(node, host, stanza); + -- FIXME don't store messages with only chat state notifications + end + -- TODO allow configuration of offline storage + -- TODO send error if not storing offline elseif stanza.name == "iq" then -- TODO send IQ error end |