diff options
Diffstat (limited to 'plugins/mod_c2s.lua')
-rw-r--r-- | plugins/mod_c2s.lua | 105 |
1 files changed, 69 insertions, 36 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 5feb1f2c..f0cdd7fb 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -15,9 +15,10 @@ local sessionmanager = require "core.sessionmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -31,12 +32,12 @@ local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; local hosts = prosody.hosts; -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; function stream_callbacks.streamopened(session, attr) local send = session.send; @@ -50,15 +51,13 @@ function stream_callbacks.streamopened(session, attr) session.streamid = uuid_generate(); (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); - if not hosts[session.host] or not hosts[session.host].users then + if not hosts[session.host] or not hosts[session.host].modules.c2s then -- We don't serve this host... session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; return; end - send("<?xml version='1.0'?>"..st.stanza("stream:stream", { - xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; - id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()); + session:open_stream(); (session.log or log)("debug", "Sent reply <stream:stream> to client"); session.notopen = nil; @@ -67,20 +66,21 @@ function stream_callbacks.streamopened(session, attr) -- since we now have a new stream header, session is secured if session.secure == false then session.secure = true; + session.encrypted = true; - -- Check if TLS compression is used local sock = session.conn:socket(); if sock.info then - session.compressed = sock:info"compression"; - elseif sock.compression then - session.compressed = sock:compression(); --COMPAT mw/luasec-hg + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg end end local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - module:fire_event("stream-features", session, features); - send(features); end @@ -116,12 +116,9 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + session.thread:run(stanza); end --- Session methods @@ -129,8 +126,7 @@ local function session_close(session, reason) local log = session.log or log; if session.conn then if session.notopen then - session.send("<?xml version='1.0'?>"); - session.send(st.stanza("stream:stream", default_stream_attr):top_tag()); + session:open_stream(); end if reason then -- nil == no err, initiated by us, false == initiated by client local stream_error = st.stanza("stream:error"); @@ -153,12 +149,12 @@ local function session_close(session, reason) log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); session.send(stream_error); end - + session.send("</stream:stream>"); function session.send() return false; end - + local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; - session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); + session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote local conn = session.conn; @@ -188,16 +184,29 @@ module:hook_global("user-deleted", function(event) end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) local session = sm_new_session(conn); sessions[conn] = session; - + session.log("info", "Client connected"); - + -- Client is using legacy SSL (otherwise mod_tls sets this flag) if conn:ssl() then session.secure = true; + session.encrypted = true; -- Check if TLS compression is used local sock = conn:socket(); @@ -207,34 +216,41 @@ function listener.onconnect(conn) session.compressed = sock:compression(); --COMPAT mw/luasec-hg end end - + if opt_keepalives then conn:setoption("keepalive", opt_keepalives); end - + session.close = session_close; - + local stream = new_xmpp_stream(session, stream_callbacks); session.stream = stream; session.notopen = true; - + function session.reset_stream() session.notopen = true; session.stream:reset(); end - + + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); + local filter = session.filter; function session.data(data) - data = filter("bytes/in", data); + -- Parse the data, which will store stanzas in session.pending_stanzas if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + data = filter("bytes/in", data); + if data then + local ok, err = stream:feed(data); + if not ok then + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); + session:close("not-well-formed"); + end + end end end - if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then @@ -262,10 +278,27 @@ function listener.ondisconnect(conn, err) end end +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session }); + end +end + +local function keepalive(event) + return event.session.send(' '); +end + function listener.associate_session(conn, session) sessions[conn] = session; end +function module.add_host(module) + module:hook("c2s-read-timeout", keepalive, -1); +end + +module:hook("c2s-read-timeout", keepalive, -1); + module:hook("server-stopping", function(event) local reason = event.reason; for _, session in pairs(sessions) do |