diff options
-rw-r--r-- | plugins/mod_c2s.lua | 81 |
1 files changed, 18 insertions, 63 deletions
diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index fbac12cd..8a652a33 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -15,11 +15,10 @@ local sessionmanager = require "core.sessionmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; local t_insert, t_remove = table.insert, table.remove; -local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -35,6 +34,7 @@ local hosts = prosody.hosts; local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; @@ -119,10 +119,9 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - t_insert(session.pending_stanzas, stanza); + session.thread:run(stanza); end --- Session methods @@ -189,6 +188,18 @@ module:hook_global("user-deleted", function(event) end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) local session = sm_new_session(conn); @@ -224,14 +235,9 @@ function listener.onconnect(conn) session.stream:reset(); end - session.thread = coroutine.create(function (stanza) - while true do - core_process_stanza(session, stanza); - stanza = coroutine.yield("ready"); - end - end); - - session.pending_stanzas = {}; + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); local filter = session.filter; function session.data(data) @@ -246,41 +252,6 @@ function listener.onconnect(conn) end end end - - if co_running() ~= session.thread and not session.paused then - if session.state == "wait" then - session.state = "ready"; - local ok, state = co_resume(session.thread); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - return; - end - end - -- We're not currently running, so start the thread to process pending stanzas - local s, thread = session.pending_stanzas, session.thread; - local n = #s; - while n > 0 and session.state ~= "wait" do - session.log("debug", "processing %d stanzas", n); - local consumed; - for i = 1,n do - local stanza = s[i]; - local ok, state = co_resume(thread, stanza); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - consumed = i; - session.state = "wait"; - break; - end - end - if not consumed then consumed = n; end - for i = 1, #s do - s[i] = s[consumed+i]; - end - n = #s; - end - end end if c2s_timeout then @@ -292,22 +263,6 @@ function listener.onconnect(conn) end session.dispatch_stanza = stream_callbacks.handlestanza; - - function session:sleep(by) - session.log("debug", "Sleeping for %s", by); - session.paused = by or "?"; - session.conn:pause(); - if co_running() == session.thread then - coroutine.yield("wait"); - end - end - function session:wake(by) - assert(session.paused == (by or "?")); - session.log("debug", "Waking for %s", by); - session.paused = nil; - session.conn:resume(); - session.data(); --FIXME: next tick? - end end function listener.onincoming(conn, data) |