diff options
author | Matthew Wild <mwild1@gmail.com> | 2013-08-09 11:10:22 +0100 |
---|---|---|
committer | Matthew Wild <mwild1@gmail.com> | 2013-08-09 11:10:22 +0100 |
commit | 3a97810358162d7bf45920edb93f38863b96e6fb (patch) | |
tree | 11649850ede15ca02d316aaa177ec848f091aef9 /plugins/mod_c2s.lua | |
parent | 5b350ae8c023aa2c67edc12cb833fd650c74c689 (diff) | |
download | prosody-3a97810358162d7bf45920edb93f38863b96e6fb.tar.gz prosody-3a97810358162d7bf45920edb93f38863b96e6fb.zip |
mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
Diffstat (limited to 'plugins/mod_c2s.lua')
-rw-r--r-- | plugins/mod_c2s.lua | 83 |
1 files changed, 73 insertions, 10 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index c26c5510..79a16e29 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate; local xpcall, tostring, type = xpcall, tostring, type; local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; +local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -31,7 +33,7 @@ local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; local hosts = prosody.hosts; -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; --- Stream events handlers @@ -120,9 +122,7 @@ end local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + t_insert(session.pending_stanzas, stanza); end --- Session methods @@ -224,18 +224,65 @@ function listener.onconnect(conn) session.stream:reset(); end + session.thread = coroutine.create(function (stanza) + while true do + core_process_stanza(session, stanza); + stanza = coroutine.yield("ready"); + end + end); + + session.pending_stanzas = {}; + local filter = session.filter; function session.data(data) - data = filter("bytes/in", data); + -- Parse the data, which will store stanzas in session.pending_stanzas if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + data = filter("bytes/in", data); + if data then + local ok, err = stream:feed(data); + if not ok then + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); + session:close("not-well-formed"); + end + end + end + + if co_running() ~= session.thread and not session.paused then + if session.state == "wait" then + session.state = "ready"; + local ok, state = co_resume(session.thread); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + return; + end + end + -- We're not currently running, so start the thread to process pending stanzas + local s, thread = session.pending_stanzas, session.thread; + local n = #s; + while n > 0 and session.state ~= "wait" do + session.log("debug", "processing %d stanzas", n); + local consumed; + for i = 1,n do + local stanza = s[i]; + local ok, state = co_resume(thread, stanza); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + consumed = i; + session.state = "wait"; + break; + end + end + if not consumed then consumed = n; end + for i = 1, #s do + s[i] = s[consumed+i]; + end + n = #s; + end end end - if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then @@ -245,6 +292,22 @@ function listener.onconnect(conn) end session.dispatch_stanza = stream_callbacks.handlestanza; + + function session:sleep(by) + session.log("debug", "Sleeping for %s", by); + session.paused = by or "?"; + session.conn:pause(); + if co_running() == session.thread then + coroutine.yield("wait"); + end + end + function session:wake(by) + assert(session.paused == (by or "?")); + session.log("debug", "Waking for %s", by); + session.paused = nil; + session.conn:resume(); + session.data(); --FIXME: next tick? + end end function listener.onincoming(conn, data) |