aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_c2s.lua
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2013-08-09 11:10:22 +0100
committerMatthew Wild <mwild1@gmail.com>2013-08-09 11:10:22 +0100
commit53836024291aa7c72bee8e2437648d3eb114f5cc (patch)
tree11649850ede15ca02d316aaa177ec848f091aef9 /plugins/mod_c2s.lua
parent96466999c175612fd341819695f473f4b019ea0c (diff)
downloadprosody-53836024291aa7c72bee8e2437648d3eb114f5cc.tar.gz
prosody-53836024291aa7c72bee8e2437648d3eb114f5cc.zip
mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
Diffstat (limited to 'plugins/mod_c2s.lua')
-rw-r--r--plugins/mod_c2s.lua83
1 files changed, 73 insertions, 10 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua
index c26c5510..79a16e29 100644
--- a/plugins/mod_c2s.lua
+++ b/plugins/mod_c2s.lua
@@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
+local t_insert, t_remove = table.insert, table.remove;
+local co_running, co_resume = coroutine.running, coroutine.resume;
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -31,7 +33,7 @@ local sessions = module:shared("sessions");
local core_process_stanza = prosody.core_process_stanza;
local hosts = prosody.hosts;
-local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:client" };
local listener = {};
--- Stream events handlers
@@ -120,9 +122,7 @@ end
local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
function stream_callbacks.handlestanza(session, stanza)
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ t_insert(session.pending_stanzas, stanza);
end
--- Session methods
@@ -224,18 +224,65 @@ function listener.onconnect(conn)
session.stream:reset();
end
+ session.thread = coroutine.create(function (stanza)
+ while true do
+ core_process_stanza(session, stanza);
+ stanza = coroutine.yield("ready");
+ end
+ end);
+
+ session.pending_stanzas = {};
+
local filter = session.filter;
function session.data(data)
- data = filter("bytes/in", data);
+ -- Parse the data, which will store stanzas in session.pending_stanzas
if data then
- local ok, err = stream:feed(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("not-well-formed");
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if not ok then
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("not-well-formed");
+ end
+ end
+ end
+
+ if co_running() ~= session.thread and not session.paused then
+ if session.state == "wait" then
+ session.state = "ready";
+ local ok, state = co_resume(session.thread);
+ if not ok then
+ log("error", "Traceback[c2s]: %s", state);
+ elseif state == "wait" then
+ return;
+ end
+ end
+ -- We're not currently running, so start the thread to process pending stanzas
+ local s, thread = session.pending_stanzas, session.thread;
+ local n = #s;
+ while n > 0 and session.state ~= "wait" do
+ session.log("debug", "processing %d stanzas", n);
+ local consumed;
+ for i = 1,n do
+ local stanza = s[i];
+ local ok, state = co_resume(thread, stanza);
+ if not ok then
+ log("error", "Traceback[c2s]: %s", state);
+ elseif state == "wait" then
+ consumed = i;
+ session.state = "wait";
+ break;
+ end
+ end
+ if not consumed then consumed = n; end
+ for i = 1, #s do
+ s[i] = s[consumed+i];
+ end
+ n = #s;
+ end
end
end
-
if c2s_timeout then
add_task(c2s_timeout, function ()
if session.type == "c2s_unauthed" then
@@ -245,6 +292,22 @@ function listener.onconnect(conn)
end
session.dispatch_stanza = stream_callbacks.handlestanza;
+
+ function session:sleep(by)
+ session.log("debug", "Sleeping for %s", by);
+ session.paused = by or "?";
+ session.conn:pause();
+ if co_running() == session.thread then
+ coroutine.yield("wait");
+ end
+ end
+ function session:wake(by)
+ assert(session.paused == (by or "?"));
+ session.log("debug", "Waking for %s", by);
+ session.paused = nil;
+ session.conn:resume();
+ session.data(); --FIXME: next tick?
+ end
end
function listener.onincoming(conn, data)