aboutsummaryrefslogtreecommitdiffstats
path: root/util/async.lua
diff options
context:
space:
mode:
Diffstat (limited to 'util/async.lua')
-rw-r--r--util/async.lua26
1 files changed, 21 insertions, 5 deletions
diff --git a/util/async.lua b/util/async.lua
index 2830238f..9a3485e9 100644
--- a/util/async.lua
+++ b/util/async.lua
@@ -1,7 +1,8 @@
-local logger = require "util.logger";
+local logger = require "prosody.util.logger";
local log = logger.init("util.async");
-local new_id = require "util.id".short;
-local xpcall = require "util.xpcall".xpcall;
+local new_id = require "prosody.util.id".short;
+local xpcall = require "prosody.util.xpcall".xpcall;
+local time_now = require "prosody.util.time".now;
local function checkthread()
local thread, main = coroutine.running();
@@ -45,7 +46,9 @@ end
local function runner_continue(thread)
-- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
if coroutine.status(thread) ~= "suspended" then -- This should suffice
- log("error", "unexpected async state: thread not suspended");
+ log("error", "unexpected async state: thread not suspended (%s, %s)", thread, coroutine.status(thread));
+ -- Fetching the traceback is likely to *crash* if a C library is calling us while suspended
+ --log("error", "coroutine stack: %s", debug.traceback());
return false;
end
local ok, state, runner = coroutine.resume(thread);
@@ -138,6 +141,8 @@ end
local runner_mt = {};
runner_mt.__index = runner_mt;
+local waiting_runners = {};
+
local function runner_create_thread(func, self)
local thread = coroutine.create(function (self) -- luacheck: ignore 432/self
while true do
@@ -195,6 +200,8 @@ function runner_mt:run(input)
-- Loop through queue items, and attempt to run them
for i = 1,n do
local queued_input = q[i];
+ self:log("Resuming thread with new item [%s]", thread);
+ self.current_item = queued_input;
local ok, new_state = coroutine.resume(thread, queued_input);
if not ok then
-- There was an error running the coroutine, save the error, mark runner as ready to begin again
@@ -221,8 +228,13 @@ function runner_mt:run(input)
end
-- Runner processed all items it can, so save current runner state
self.state = state;
+ if state == "ready" and self.current_item then
+ self.current_item = nil;
+ end
+
if err or state ~= self.notified_state then
- self:log("debug", "changed state from %s to %s", self.notified_state, err and ("error ("..state..")") or state);
+ self:log("debug", "changed state from %s to %s [%s %s]", self.notified_state, err and ("error (" .. state .. ")") or state, self.thread,
+ self.thread and coroutine.status(self.thread));
if err then
state = "error"
else
@@ -234,6 +246,7 @@ function runner_mt:run(input)
if n > 0 then
return self:run();
end
+ waiting_runners[self] = state == "waiting" and time_now() or nil;
return true, state, n;
end
@@ -293,4 +306,7 @@ return {
set_nexttick = function(new_next_tick) next_tick = new_next_tick; end;
set_schedule_function = function (new_schedule_function) schedule_task = new_schedule_function; end;
+
+ waiting_runners = waiting_runners;
+ default_runner_func = default_func;
};