diff options
Diffstat (limited to 'plugins/mod_c2s.lua')
-rw-r--r-- | plugins/mod_c2s.lua | 134 |
1 files changed, 105 insertions, 29 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index f9c2e9fb..e97b3b3f 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -12,6 +12,7 @@ local add_task = require "util.timer".add_task; local new_xmpp_stream = require "util.xmppstream".new; local nameprep = require "util.encodings".stringprep.nameprep; local sessionmanager = require "core.sessionmanager"; +local statsmanager = require "core.statsmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; @@ -28,8 +29,7 @@ local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5); local opt_keepalives = module:get_option_boolean("c2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); local stanza_size_limit = module:get_option_number("c2s_stanza_size_limit", 1024*256); -local measure_connections = module:measure("connections", "amount"); -local measure_ipv6 = module:measure("ipv6", "amount"); +local measure_connections = module:metric("gauge", "connections", "", "Established c2s connections", {"host", "type", "ip_family"}); local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; @@ -39,24 +39,46 @@ local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; local runner_callbacks = {}; +local m_tls_params = module:metric( + "counter", "encrypted", "", + "Encrypted connections", + {"protocol"; "cipher"} +); + module:hook("stats-update", function () - local count = 0; - local ipv6 = 0; + -- for push backends, avoid sending out updates for each increment of + -- the metric below. + statsmanager.cork() + measure_connections:clear() for _, session in pairs(sessions) do - count = count + 1; - if session.ip and session.ip:match(":") then - ipv6 = ipv6 + 1; - end + local host = session.host or "" + local type_ = session.type or "other" + + -- we want to expose both v4 and v6 counters in all cases to make + -- queries smoother + local is_ipv6 = session.ip and session.ip:match(":") and 1 or 0 + local is_ipv4 = 1 - is_ipv6 + measure_connections:with_labels(host, type_, "ipv4"):add(is_ipv4) + measure_connections:with_labels(host, type_, "ipv6"):add(is_ipv6) end - measure_connections(count); - measure_ipv6(ipv6); + statsmanager.uncork() end); --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ stream = "opened", attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) local send = session.send; + if not attr.to then + session:close{ condition = "improper-addressing", + text = "A 'to' attribute is required on stream headers" }; + return; + end local host = nameprep(attr.to); if not host then session:close{ condition = "improper-addressing", @@ -80,7 +102,10 @@ function stream_callbacks.streamopened(session, attr) return; end - session:open_stream(); + session:open_stream(host, attr.from); + + -- Opening the stream can cause the stream to be closed + if session.destroyed then return end (session.log or log)("debug", "Sent reply <stream:stream> to client"); session.notopen = nil; @@ -92,13 +117,13 @@ function stream_callbacks.streamopened(session, attr) session.encrypted = true; local sock = session.conn:socket(); - if sock.info then - local info = sock:info(); + local info = sock.info and sock:info(); + if type(info) == "table" then (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); session.compressed = info.compression; + m_tls_params:with_labels(info.protocol, info.cipher):add(1) else (session.log or log)("info", "Stream encrypted"); - session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg end end @@ -107,15 +132,23 @@ function stream_callbacks.streamopened(session, attr) if features.tags[1] or session.full_jid then send(features); else - (session.log or log)("warn", "No stream features to offer"); - session:close({ - condition = "undefined-condition"; - text = "No stream features to proceed with on "..(session.secure and "" or "in").."secure stream"; - }); + if session.secure then + -- Here SASL should be offered + (session.log or log)("warn", "No stream features to offer on secure session. Check authentication settings."); + else + -- Normally STARTTLS would be offered + (session.log or log)("warn", "No stream features to offer on insecure session. Check encryption and security settings."); + end + session:close{ condition = "undefined-condition", text = "No stream features to proceed with" }; end end -function stream_callbacks.streamclosed(session) +function stream_callbacks.streamclosed(session, attr) + -- run _streamclosed in async context + session.thread:run({ stream = "closed", attr = attr }); +end + +function stream_callbacks._streamclosed(session) session.log("debug", "Received </stream:stream>"); session:close(false); end @@ -125,7 +158,7 @@ function stream_callbacks.error(session, error, data) session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); session:close("invalid-namespace"); elseif error == "parse-error" then - (session.log or log)("debug", "Client XML parse error: %s", tostring(data)); + (session.log or log)("debug", "Client XML parse error: %s", data); session:close("not-well-formed"); elseif error == "stream-error" then local condition, text = "undefined-condition"; @@ -254,17 +287,20 @@ function listener.onconnect(conn) session.log("info", "Client connected"); - -- Client is using legacy SSL (otherwise mod_tls sets this flag) + -- Client is using Direct TLS or legacy SSL (otherwise mod_tls sets this flag) if conn:ssl() then session.secure = true; session.encrypted = true; -- Check if TLS compression is used local sock = conn:socket(); - if sock.info then - session.compressed = sock:info"compression"; - elseif sock.compression then - session.compressed = sock:compression(); --COMPAT mw/luasec-hg + local info = sock.info and sock:info(); + if type(info) == "table" then + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); + session.compressed = info.compression; + m_tls_params:with_labels(info.protocol, info.cipher):add(1) + else + (session.log or log)("info", "Stream encrypted"); end end @@ -284,7 +320,13 @@ function listener.onconnect(conn) end session.thread = runner(function (stanza) - core_process_stanza(session, stanza); + if st.is_stanza(stanza) then + core_process_stanza(session, stanza); + elseif stanza.stream == "opened" then + stream_callbacks._streamopened(session, stanza.attr); + elseif stanza.stream == "closed" then + stream_callbacks._streamclosed(session, stanza.attr); + end end, runner_callbacks, session); local filter = session.filter; @@ -295,8 +337,16 @@ function listener.onconnect(conn) if data then local ok, err = stream:feed(data); if not ok then - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); + if err == "stanza-too-large" then + session:close({ + condition = "policy-violation", + text = "XML stanza is too big", + extra = st.stanza("stanza-too-big", { xmlns = 'urn:xmpp:errors' }), + }); + else + session:close("not-well-formed"); + end end end end @@ -305,6 +355,7 @@ function listener.onconnect(conn) if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then + (session.log or log)("debug", "Connection still not authenticated after c2s_timeout=%gs, closing it", c2s_timeout); session:close("connection-timeout"); end end); @@ -339,6 +390,20 @@ function listener.onreadtimeout(conn) end end +function listener.ondrain(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("c2s-ondrain", { session = session }); + end +end + +function listener.onpredrain(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("c2s-pre-ondrain", { session = session }); + end +end + local function keepalive(event) local session = event.session; if not session.notopen then @@ -371,10 +436,21 @@ module:provides("net", { default_port = 5222; encryption = "starttls"; multiplex = { + protocol = "xmpp-client"; + pattern = "^<.*:stream.*%sxmlns%s*=%s*(['\"])jabber:client%1.*>"; + }; +}); + +module:provides("net", { + name = "c2s_direct_tls"; + listener = listener; + encryption = "ssl"; + multiplex = { pattern = "^<.*:stream.*%sxmlns%s*=%s*(['\"])jabber:client%1.*>"; }; }); +-- COMPAT module:provides("net", { name = "legacy_ssl"; listener = listener; |