From 8d85647c37f3b1fd49e0d639d5646745672391bd Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Thu, 30 May 2013 14:32:40 +0200 Subject: mod_c2s, mod_c2s: Send a whitespace on read timeout, to prod TCP into detecting if the connection died --- plugins/mod_c2s.lua | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 1d2dd6dd..f9a270c7 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -262,6 +262,13 @@ function listener.ondisconnect(conn, err) end end +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return session.send(' '); + end +end + function listener.associate_session(conn, session) sessions[conn] = session; end -- cgit v1.2.3 From b6ecf01333bcb773beb784e2be93af305dd1cf01 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Tue, 11 Jun 2013 21:18:51 +0200 Subject: mod_c2s: Become a shared module and allow being disabled on some virtualhosts --- plugins/mod_c2s.lua | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index f9a270c7..d87b3415 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -50,7 +50,7 @@ function stream_callbacks.streamopened(session, attr) session.streamid = uuid_generate(); (session.log or session)("debug", "Client sent opening to %s", session.host); - if not hosts[session.host] then + if not hosts[session.host] or not hosts[session.host].modules.c2s then -- We don't serve this host... session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; return; @@ -273,6 +273,8 @@ function listener.associate_session(conn, session) sessions[conn] = session; end +function module.add_host() end + module:hook("server-stopping", function(event) local reason = event.reason; for _, session in pairs(sessions) do -- cgit v1.2.3 From 4c9866805e69426b720586141c73d94fc4a54cd5 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Tue, 11 Jun 2013 21:36:15 +0200 Subject: mod_c2s, mod_s2s: Fire an event on read timeouts --- plugins/mod_c2s.lua | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index d87b3415..91bde574 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -265,15 +265,23 @@ end function listener.onreadtimeout(conn) local session = sessions[conn]; if session then - return session.send(' '); + return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session }); end end +local function keepalive(event) + return event.session.send(' '); +end + function listener.associate_session(conn, session) sessions[conn] = session; end -function module.add_host() end +function module.add_host(module) + module:hook("c2s-read-timeout", keepalive, -1); +end + +module:hook("c2s-read-timeout", keepalive, -1); module:hook("server-stopping", function(event) local reason = event.reason; -- cgit v1.2.3 From cab180216b49e1c6f1f0e847357df58436551643 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Mon, 15 Jul 2013 11:44:49 +0100 Subject: mod_bosh, mod_c2s: No longer fire stream-features globally (nobody uses it, and shared modules make it easy for global modules to hook per-host now) --- plugins/mod_c2s.lua | 2 -- 1 file changed, 2 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 91bde574..d667e781 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -79,8 +79,6 @@ function stream_callbacks.streamopened(session, attr) local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - module:fire_event("stream-features", session, features); - send(features); end -- cgit v1.2.3 From 0c6c0e9c294d9b3b757cf853facaf845087f17ae Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Fri, 2 Aug 2013 15:12:24 +0200 Subject: mod_c2s, mod_s2s: Log a message that stream encryption has been enabled with some details --- plugins/mod_c2s.lua | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 3eb9e975..c26c5510 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -68,12 +68,15 @@ function stream_callbacks.streamopened(session, attr) if session.secure == false then session.secure = true; - -- Check if TLS compression is used local sock = session.conn:socket(); if sock.info then - session.compressed = sock:info"compression"; - elseif sock.compression then - session.compressed = sock:compression(); --COMPAT mw/luasec-hg + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s) with %s, authenticated with %s and exchanged keys with %s", + info.protocol, info.encryption, info.authentication, info.key); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg end end -- cgit v1.2.3 From 53836024291aa7c72bee8e2437648d3eb114f5cc Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Fri, 9 Aug 2013 11:10:22 +0100 Subject: mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing. --- plugins/mod_c2s.lua | 83 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 10 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index c26c5510..79a16e29 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate; local xpcall, tostring, type = xpcall, tostring, type; local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; +local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -31,7 +33,7 @@ local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; local hosts = prosody.hosts; -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; --- Stream events handlers @@ -120,9 +122,7 @@ end local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + t_insert(session.pending_stanzas, stanza); end --- Session methods @@ -224,18 +224,65 @@ function listener.onconnect(conn) session.stream:reset(); end + session.thread = coroutine.create(function (stanza) + while true do + core_process_stanza(session, stanza); + stanza = coroutine.yield("ready"); + end + end); + + session.pending_stanzas = {}; + local filter = session.filter; function session.data(data) - data = filter("bytes/in", data); + -- Parse the data, which will store stanzas in session.pending_stanzas if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + data = filter("bytes/in", data); + if data then + local ok, err = stream:feed(data); + if not ok then + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); + session:close("not-well-formed"); + end + end + end + + if co_running() ~= session.thread and not session.paused then + if session.state == "wait" then + session.state = "ready"; + local ok, state = co_resume(session.thread); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + return; + end + end + -- We're not currently running, so start the thread to process pending stanzas + local s, thread = session.pending_stanzas, session.thread; + local n = #s; + while n > 0 and session.state ~= "wait" do + session.log("debug", "processing %d stanzas", n); + local consumed; + for i = 1,n do + local stanza = s[i]; + local ok, state = co_resume(thread, stanza); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + consumed = i; + session.state = "wait"; + break; + end + end + if not consumed then consumed = n; end + for i = 1, #s do + s[i] = s[consumed+i]; + end + n = #s; + end end end - if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then @@ -245,6 +292,22 @@ function listener.onconnect(conn) end session.dispatch_stanza = stream_callbacks.handlestanza; + + function session:sleep(by) + session.log("debug", "Sleeping for %s", by); + session.paused = by or "?"; + session.conn:pause(); + if co_running() == session.thread then + coroutine.yield("wait"); + end + end + function session:wake(by) + assert(session.paused == (by or "?")); + session.log("debug", "Waking for %s", by); + session.paused = nil; + session.conn:resume(); + session.data(); --FIXME: next tick? + end end function listener.onincoming(conn, data) -- cgit v1.2.3 From 1d833bb80779ed9c9e1d7ec6c7fab231ebf48182 Mon Sep 17 00:00:00 2001 From: Florian Zeitz Date: Fri, 9 Aug 2013 17:48:21 +0200 Subject: Remove all trailing whitespace --- plugins/mod_c2s.lua | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 79a16e29..fbac12cd 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -154,10 +154,10 @@ local function session_close(session, reason) log("debug", "Disconnecting client, is: %s", stream_error); session.send(stream_error); end - + session.send(""); function session.send() return false; end - + local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); @@ -193,9 +193,9 @@ end, 200); function listener.onconnect(conn) local session = sm_new_session(conn); sessions[conn] = session; - + session.log("info", "Client connected"); - + -- Client is using legacy SSL (otherwise mod_tls sets this flag) if conn:ssl() then session.secure = true; @@ -208,22 +208,22 @@ function listener.onconnect(conn) session.compressed = sock:compression(); --COMPAT mw/luasec-hg end end - + if opt_keepalives then conn:setoption("keepalive", opt_keepalives); end - + session.close = session_close; - + local stream = new_xmpp_stream(session, stream_callbacks); session.stream = stream; session.notopen = true; - + function session.reset_stream() session.notopen = true; session.stream:reset(); end - + session.thread = coroutine.create(function (stanza) while true do core_process_stanza(session, stanza); -- cgit v1.2.3 From e0216573ab30d11b54056a9fe4b43293ebf0a910 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Sun, 11 Aug 2013 14:46:27 +0100 Subject: mod_c2s: Port coroutine code to util.async --- plugins/mod_c2s.lua | 81 ++++++++++++----------------------------------------- 1 file changed, 18 insertions(+), 63 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index fbac12cd..8a652a33 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -15,11 +15,10 @@ local sessionmanager = require "core.sessionmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; local t_insert, t_remove = table.insert, table.remove; -local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -35,6 +34,7 @@ local hosts = prosody.hosts; local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; @@ -119,10 +119,9 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - t_insert(session.pending_stanzas, stanza); + session.thread:run(stanza); end --- Session methods @@ -189,6 +188,18 @@ module:hook_global("user-deleted", function(event) end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) local session = sm_new_session(conn); @@ -224,14 +235,9 @@ function listener.onconnect(conn) session.stream:reset(); end - session.thread = coroutine.create(function (stanza) - while true do - core_process_stanza(session, stanza); - stanza = coroutine.yield("ready"); - end - end); - - session.pending_stanzas = {}; + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); local filter = session.filter; function session.data(data) @@ -246,41 +252,6 @@ function listener.onconnect(conn) end end end - - if co_running() ~= session.thread and not session.paused then - if session.state == "wait" then - session.state = "ready"; - local ok, state = co_resume(session.thread); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - return; - end - end - -- We're not currently running, so start the thread to process pending stanzas - local s, thread = session.pending_stanzas, session.thread; - local n = #s; - while n > 0 and session.state ~= "wait" do - session.log("debug", "processing %d stanzas", n); - local consumed; - for i = 1,n do - local stanza = s[i]; - local ok, state = co_resume(thread, stanza); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - consumed = i; - session.state = "wait"; - break; - end - end - if not consumed then consumed = n; end - for i = 1, #s do - s[i] = s[consumed+i]; - end - n = #s; - end - end end if c2s_timeout then @@ -292,22 +263,6 @@ function listener.onconnect(conn) end session.dispatch_stanza = stream_callbacks.handlestanza; - - function session:sleep(by) - session.log("debug", "Sleeping for %s", by); - session.paused = by or "?"; - session.conn:pause(); - if co_running() == session.thread then - coroutine.yield("wait"); - end - end - function session:wake(by) - assert(session.paused == (by or "?")); - session.log("debug", "Waking for %s", by); - session.paused = nil; - session.conn:resume(); - session.data(); --FIXME: next tick? - end end function listener.onincoming(conn, data) -- cgit v1.2.3 From 938d568ac476bb73e83b8ee44ba61c23f29557a8 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Wed, 14 Aug 2013 15:00:36 +0200 Subject: mod_c2s, mod_s2s: Log cipher and encryption info in a more compact and (hopefully) less confusing way --- plugins/mod_c2s.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 8a652a33..1cf4b98d 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -73,8 +73,7 @@ function stream_callbacks.streamopened(session, attr) local sock = session.conn:socket(); if sock.info then local info = sock:info(); - (session.log or log)("info", "Stream encrypted (%s) with %s, authenticated with %s and exchanged keys with %s", - info.protocol, info.encryption, info.authentication, info.key); + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); session.compressed = info.compression; else (session.log or log)("info", "Stream encrypted"); -- cgit v1.2.3 From 0c39eb02b91bc1028f523dde1d6b5112cbff8a7f Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Wed, 14 Aug 2013 15:38:56 +0200 Subject: mod_c2s: Move another log message to debug level --- plugins/mod_c2s.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 1cf4b98d..1fb8dcf5 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -157,7 +157,7 @@ local function session_close(session, reason) function session.send() return false; end local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; - session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); + session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote local conn = session.conn; -- cgit v1.2.3 From 5178a1e79fed67890c26d0f9a052c70b6a36b5a6 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Mon, 7 Oct 2013 12:43:00 +0200 Subject: mod_c2s, mod_s2s: Set session.encrypted as session.secure does not allways mean encrypted (eg consider_bosh_secure) --- plugins/mod_c2s.lua | 2 ++ 1 file changed, 2 insertions(+) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 1fb8dcf5..3bdffc7d 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -69,6 +69,7 @@ function stream_callbacks.streamopened(session, attr) -- since we now have a new stream header, session is secured if session.secure == false then session.secure = true; + session.encrypted = true; local sock = session.conn:socket(); if sock.info then @@ -209,6 +210,7 @@ function listener.onconnect(conn) -- Client is using legacy SSL (otherwise mod_tls sets this flag) if conn:ssl() then session.secure = true; + session.encrypted = true; -- Check if TLS compression is used local sock = conn:socket(); -- cgit v1.2.3 From c5c7ef5b0f47d1e51117f756420176f1f0a2466d Mon Sep 17 00:00:00 2001 From: Florian Zeitz Date: Fri, 31 Jan 2014 12:01:12 +0100 Subject: mod_c2s: Break out stream opening into a separate function --- plugins/mod_c2s.lua | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'plugins/mod_c2s.lua') diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 3bdffc7d..7a8af406 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -38,7 +38,6 @@ local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; function stream_callbacks.streamopened(session, attr) local send = session.send; @@ -58,9 +57,7 @@ function stream_callbacks.streamopened(session, attr) return; end - send(""..st.stanza("stream:stream", { - xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; - id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()); + session:open_stream(); (session.log or log)("debug", "Sent reply to client"); session.notopen = nil; @@ -129,8 +126,7 @@ local function session_close(session, reason) local log = session.log or log; if session.conn then if session.notopen then - session.send(""); - session.send(st.stanza("stream:stream", default_stream_attr):top_tag()); + session:open_stream(); end if reason then -- nil == no err, initiated by us, false == initiated by client local stream_error = st.stanza("stream:error"); @@ -178,6 +174,19 @@ local function session_close(session, reason) end end +local function session_open_stream(session) + local attr = { + ["xmlns:stream"] = 'http://etherx.jabber.org/streams', + xmlns = stream_callbacks.default_ns, + version = "1.0", + ["xml:lang"] = 'en', + id = session.streamid or "", + from = session.host + }; + session.send(""); + session.send(st.stanza("stream:stream", attr):top_tag()); +end + module:hook_global("user-deleted", function(event) local username, host = event.username, event.host; local user = hosts[host].sessions[username]; @@ -225,6 +234,7 @@ function listener.onconnect(conn) conn:setoption("keepalive", opt_keepalives); end + session.open_stream = session_open_stream; session.close = session_close; local stream = new_xmpp_stream(session, stream_callbacks); -- cgit v1.2.3