aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_c2s.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_c2s.lua')
-rw-r--r--plugins/mod_c2s.lua115
1 files changed, 98 insertions, 17 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua
index b2a81592..79a16e29 100644
--- a/plugins/mod_c2s.lua
+++ b/plugins/mod_c2s.lua
@@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
+local t_insert, t_remove = table.insert, table.remove;
+local co_running, co_resume = coroutine.running, coroutine.resume;
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -31,7 +33,7 @@ local sessions = module:shared("sessions");
local core_process_stanza = prosody.core_process_stanza;
local hosts = prosody.hosts;
-local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:client" };
local listener = {};
--- Stream events handlers
@@ -50,7 +52,7 @@ function stream_callbacks.streamopened(session, attr)
session.streamid = uuid_generate();
(session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
- if not hosts[session.host] then
+ if not hosts[session.host] or not hosts[session.host].modules.c2s then
-- We don't serve this host...
session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
return;
@@ -68,19 +70,20 @@ function stream_callbacks.streamopened(session, attr)
if session.secure == false then
session.secure = true;
- -- Check if TLS compression is used
local sock = session.conn:socket();
if sock.info then
- session.compressed = sock:info"compression";
- elseif sock.compression then
- session.compressed = sock:compression(); --COMPAT mw/luasec-hg
+ local info = sock:info();
+ (session.log or log)("info", "Stream encrypted (%s) with %s, authenticated with %s and exchanged keys with %s",
+ info.protocol, info.encryption, info.authentication, info.key);
+ session.compressed = info.compression;
+ else
+ (session.log or log)("info", "Stream encrypted");
+ session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
end
end
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- module:fire_event("stream-features", session, features);
-
send(features);
end
@@ -119,9 +122,7 @@ end
local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
function stream_callbacks.handlestanza(session, stanza)
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ t_insert(session.pending_stanzas, stanza);
end
--- Session methods
@@ -223,18 +224,65 @@ function listener.onconnect(conn)
session.stream:reset();
end
+ session.thread = coroutine.create(function (stanza)
+ while true do
+ core_process_stanza(session, stanza);
+ stanza = coroutine.yield("ready");
+ end
+ end);
+
+ session.pending_stanzas = {};
+
local filter = session.filter;
function session.data(data)
- data = filter("bytes/in", data);
+ -- Parse the data, which will store stanzas in session.pending_stanzas
if data then
- local ok, err = stream:feed(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("not-well-formed");
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if not ok then
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("not-well-formed");
+ end
+ end
+ end
+
+ if co_running() ~= session.thread and not session.paused then
+ if session.state == "wait" then
+ session.state = "ready";
+ local ok, state = co_resume(session.thread);
+ if not ok then
+ log("error", "Traceback[c2s]: %s", state);
+ elseif state == "wait" then
+ return;
+ end
+ end
+ -- We're not currently running, so start the thread to process pending stanzas
+ local s, thread = session.pending_stanzas, session.thread;
+ local n = #s;
+ while n > 0 and session.state ~= "wait" do
+ session.log("debug", "processing %d stanzas", n);
+ local consumed;
+ for i = 1,n do
+ local stanza = s[i];
+ local ok, state = co_resume(thread, stanza);
+ if not ok then
+ log("error", "Traceback[c2s]: %s", state);
+ elseif state == "wait" then
+ consumed = i;
+ session.state = "wait";
+ break;
+ end
+ end
+ if not consumed then consumed = n; end
+ for i = 1, #s do
+ s[i] = s[consumed+i];
+ end
+ n = #s;
+ end
end
end
-
if c2s_timeout then
add_task(c2s_timeout, function ()
if session.type == "c2s_unauthed" then
@@ -244,6 +292,22 @@ function listener.onconnect(conn)
end
session.dispatch_stanza = stream_callbacks.handlestanza;
+
+ function session:sleep(by)
+ session.log("debug", "Sleeping for %s", by);
+ session.paused = by or "?";
+ session.conn:pause();
+ if co_running() == session.thread then
+ coroutine.yield("wait");
+ end
+ end
+ function session:wake(by)
+ assert(session.paused == (by or "?"));
+ session.log("debug", "Waking for %s", by);
+ session.paused = nil;
+ session.conn:resume();
+ session.data(); --FIXME: next tick?
+ end
end
function listener.onincoming(conn, data)
@@ -262,10 +326,27 @@ function listener.ondisconnect(conn, err)
end
end
+function listener.onreadtimeout(conn)
+ local session = sessions[conn];
+ if session then
+ return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session });
+ end
+end
+
+local function keepalive(event)
+ return event.session.send(' ');
+end
+
function listener.associate_session(conn, session)
sessions[conn] = session;
end
+function module.add_host(module)
+ module:hook("c2s-read-timeout", keepalive, -1);
+end
+
+module:hook("c2s-read-timeout", keepalive, -1);
+
module:hook("server-stopping", function(event)
local reason = event.reason;
for _, session in pairs(sessions) do