From 4f0eecb472e8242eaeaf53e0dbc526609851a381 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Thu, 17 Mar 2016 12:46:52 +0100 Subject: Backed out BOSH use of util.async (changeset f0687c313cf1) --- plugins/mod_bosh.lua | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index fd33226c..840de725 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -22,7 +22,6 @@ local initialize_filters = require "util.filters".initialize; local math_min = math.min; local xpcall, tostring, type = xpcall, tostring, type; local traceback = debug.traceback; -local runner = require"util.async".runner; local xmlns_streams = "http://etherx.jabber.org/streams"; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -229,8 +228,6 @@ local function bosh_close_stream(session, reason) sm_destroy_session(session); end -local runner_callbacks = { }; - -- Handle the tag in the request payload. function stream_callbacks.streamopened(context, attr) local request, response = context.request, context.response; @@ -263,10 +260,6 @@ function stream_callbacks.streamopened(context, attr) }; sessions[sid] = session; - session.thread = runner(function (stanza) - session:dispatch_stanza(stanza); - end, runner_callbacks, session); - local filter = initialize_filters(session); session.log("debug", "BOSH session created for request from %s", session.ip); @@ -364,11 +357,6 @@ function stream_callbacks.streamopened(context, attr) end local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end - -function runner_callbacks:error(err) - return handleerr(err); -end - function stream_callbacks.handlestanza(context, stanza) if context.ignore then return; end log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); @@ -378,7 +366,9 @@ function stream_callbacks.handlestanza(context, stanza) stanza.attr.xmlns = nil; end stanza = session.filter("stanzas/in", stanza); - session.thread:run(stanza); + if stanza then + return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); + end end end -- cgit v1.2.3 From 13099c75ece5abcf46e0af3f691242e086cf1197 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Thu, 17 Mar 2016 12:47:24 +0100 Subject: mod_c2s: Remove use of util.async --- core/sessionmanager.lua | 1 - plugins/mod_c2s.lua | 25 +++++-------------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/core/sessionmanager.lua b/core/sessionmanager.lua index 6aa0a4f0..c8856634 100644 --- a/core/sessionmanager.lua +++ b/core/sessionmanager.lua @@ -72,7 +72,6 @@ local function retire_session(session) function session.send(data) log("debug", "Discarding data sent to resting session: %s", tostring(data)); return false; end function session.data(data) log("debug", "Discarding data received from resting session: %s", tostring(data)); end - session.thread = { run = function (_, data) return session.data(data) end }; return setmetatable(session, resting_session); end diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 42c35689..d950225d 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -15,10 +15,9 @@ local sessionmanager = require "core.sessionmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; -local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local t_insert, t_remove = table.insert, table.remove; +local traceback = debug.traceback; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -36,7 +35,6 @@ local hosts = prosody.hosts; local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; -local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; @@ -123,9 +121,12 @@ function stream_callbacks.error(session, error, data) end end +local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - session.thread:run(stanza); + if stanza then + return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); + end end --- Session methods @@ -191,18 +192,6 @@ module:hook_global("user-deleted", function(event) end end, 200); -function runner_callbacks:ready() - self.data.conn:resume(); -end - -function runner_callbacks:waiting() - self.data.conn:pause(); -end - -function runner_callbacks:error(err) - (self.data.log or log)("error", "Traceback[c2s]: %s", err); -end - --- Port listener function listener.onconnect(conn) measure_connections(1); @@ -240,10 +229,6 @@ function listener.onconnect(conn) session.stream:reset(); end - session.thread = runner(function (stanza) - core_process_stanza(session, stanza); - end, runner_callbacks, session); - local filter = session.filter; function session.data(data) -- Parse the data, which will store stanzas in session.pending_stanzas -- cgit v1.2.3 From fd7f0f219381daca6b4cfb3ed648f81af343370f Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Thu, 17 Mar 2016 18:00:35 +0100 Subject: util.async: Remove to prevent blocking a stable release --- util/async.lua | 158 --------------------------------------------------------- 1 file changed, 158 deletions(-) delete mode 100644 util/async.lua diff --git a/util/async.lua b/util/async.lua deleted file mode 100644 index 968ec804..00000000 --- a/util/async.lua +++ /dev/null @@ -1,158 +0,0 @@ -local log = require "util.logger".init("util.async"); - -local function runner_continue(thread) - -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) - if coroutine.status(thread) ~= "suspended" then -- This should suffice - return false; - end - local ok, state, runner = coroutine.resume(thread); - if not ok then - local level = 0; - while debug.getinfo(thread, level, "") do level = level + 1; end - ok, runner = debug.getlocal(thread, level-1, 1); - local error_handler = runner.watchers.error; - if error_handler then error_handler(runner, debug.traceback(thread, state)); end - elseif state == "ready" then - -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. - -- We also have to :run(), because the queue might have further items that will not be - -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). - runner.state = "ready"; - runner:run(); - end - return true; -end - -local function waiter(num) - local thread = coroutine.running(); - if not thread then - error("Not running in an async context, see http://prosody.im/doc/developers/async"); - end - num = num or 1; - local waiting; - return function () - if num == 0 then return; end -- already done - waiting = true; - coroutine.yield("wait"); - end, function () - num = num - 1; - if num == 0 and waiting then - runner_continue(thread); - elseif num < 0 then - error("done() called too many times"); - end - end; -end - -local function guarder() - local guards = {}; - return function (id, func) - local thread = coroutine.running(); - if not thread then - error("Not running in an async context, see http://prosody.im/doc/developers/async"); - end - local guard = guards[id]; - if not guard then - guard = {}; - guards[id] = guard; - log("debug", "New guard!"); - else - table.insert(guard, thread); - log("debug", "Guarded. %d threads waiting.", #guard) - coroutine.yield("wait"); - end - local function exit() - local next_waiting = table.remove(guard, 1); - if next_waiting then - log("debug", "guard: Executing next waiting thread (%d left)", #guard) - runner_continue(next_waiting); - else - log("debug", "Guard off duty.") - guards[id] = nil; - end - end - if func then - func(); - exit(); - return; - end - return exit; - end; -end - -local runner_mt = {}; -runner_mt.__index = runner_mt; - -local function runner_create_thread(func, self) - local thread = coroutine.create(function (self) - while true do - func(coroutine.yield("ready", self)); - end - end); - assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input - return thread; -end - -local empty_watchers = {}; -local function runner(func, watchers, data) - return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", - queue = {}, watchers = watchers or empty_watchers, data = data } - , runner_mt); -end - -function runner_mt:run(input) - if input ~= nil then - table.insert(self.queue, input); - end - if self.state ~= "ready" then - return true, self.state, #self.queue; - end - - local q, thread = self.queue, self.thread; - if not thread or coroutine.status(thread) == "dead" then - thread = runner_create_thread(self.func, self); - self.thread = thread; - end - - local n, state, err = #q, self.state, nil; - self.state = "running"; - while n > 0 and state == "ready" do - local consumed; - for i = 1,n do - local input = q[i]; - local ok, new_state = coroutine.resume(thread, input); - if not ok then - consumed, state, err = i, "ready", debug.traceback(thread, new_state); - self.thread = nil; - break; - elseif new_state == "wait" then - consumed, state = i, "waiting"; - break; - end - end - if not consumed then consumed = n; end - if q[n+1] ~= nil then - n = #q; - end - for i = 1, n do - q[i] = q[consumed+i]; - end - n = #q; - end - self.state = state; - if err or state ~= self.notified_state then - if err then - state = "error" - else - self.notified_state = state; - end - local handler = self.watchers[state]; - if handler then handler(self, err); end - end - return true, state, n; -end - -function runner_mt:enqueue(input) - table.insert(self.queue, input); -end - -return { waiter = waiter, guarder = guarder, runner = runner }; -- cgit v1.2.3