diff options
Diffstat (limited to 'util/async.lua')
-rw-r--r-- | util/async.lua | 26 |
1 files changed, 21 insertions, 5 deletions
diff --git a/util/async.lua b/util/async.lua index 2830238f..9a3485e9 100644 --- a/util/async.lua +++ b/util/async.lua @@ -1,7 +1,8 @@ -local logger = require "util.logger"; +local logger = require "prosody.util.logger"; local log = logger.init("util.async"); -local new_id = require "util.id".short; -local xpcall = require "util.xpcall".xpcall; +local new_id = require "prosody.util.id".short; +local xpcall = require "prosody.util.xpcall".xpcall; +local time_now = require "prosody.util.time".now; local function checkthread() local thread, main = coroutine.running(); @@ -45,7 +46,9 @@ end local function runner_continue(thread) -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) if coroutine.status(thread) ~= "suspended" then -- This should suffice - log("error", "unexpected async state: thread not suspended"); + log("error", "unexpected async state: thread not suspended (%s, %s)", thread, coroutine.status(thread)); + -- Fetching the traceback is likely to *crash* if a C library is calling us while suspended + --log("error", "coroutine stack: %s", debug.traceback()); return false; end local ok, state, runner = coroutine.resume(thread); @@ -138,6 +141,8 @@ end local runner_mt = {}; runner_mt.__index = runner_mt; +local waiting_runners = {}; + local function runner_create_thread(func, self) local thread = coroutine.create(function (self) -- luacheck: ignore 432/self while true do @@ -195,6 +200,8 @@ function runner_mt:run(input) -- Loop through queue items, and attempt to run them for i = 1,n do local queued_input = q[i]; + self:log("Resuming thread with new item [%s]", thread); + self.current_item = queued_input; local ok, new_state = coroutine.resume(thread, queued_input); if not ok then -- There was an error running the coroutine, save the error, mark runner as ready to begin again @@ -221,8 +228,13 @@ function runner_mt:run(input) end -- Runner processed all items it can, so save current runner state self.state = state; + if state == "ready" and self.current_item then + self.current_item = nil; + end + if err or state ~= self.notified_state then - self:log("debug", "changed state from %s to %s", self.notified_state, err and ("error ("..state..")") or state); + self:log("debug", "changed state from %s to %s [%s %s]", self.notified_state, err and ("error (" .. state .. ")") or state, self.thread, + self.thread and coroutine.status(self.thread)); if err then state = "error" else @@ -234,6 +246,7 @@ function runner_mt:run(input) if n > 0 then return self:run(); end + waiting_runners[self] = state == "waiting" and time_now() or nil; return true, state, n; end @@ -293,4 +306,7 @@ return { set_nexttick = function(new_next_tick) next_tick = new_next_tick; end; set_schedule_function = function (new_schedule_function) schedule_task = new_schedule_function; end; + + waiting_runners = waiting_runners; + default_runner_func = default_func; }; |