aboutsummaryrefslogtreecommitdiffstats
path: root/util/async.lua
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2018-03-16 14:59:41 +0000
committerMatthew Wild <mwild1@gmail.com>2018-03-16 14:59:41 +0000
commitf22ee7eef685f6f569173e8133994966329551dc (patch)
tree0cd85207f6220ee571548d3770525317e8001048 /util/async.lua
parent6d7cd57d44fced8c151a1ae010b5e24d9da02a89 (diff)
downloadprosody-f22ee7eef685f6f569173e8133994966329551dc.tar.gz
prosody-f22ee7eef685f6f569173e8133994966329551dc.zip
util.async: Add per-runner ids and add runner:log() method
Diffstat (limited to 'util/async.lua')
-rw-r--r--util/async.lua11
1 files changed, 10 insertions, 1 deletions
diff --git a/util/async.lua b/util/async.lua
index 992797b8..3934691b 100644
--- a/util/async.lua
+++ b/util/async.lua
@@ -1,4 +1,5 @@
local log = require "util.logger".init("util.async");
+local new_id = require "util.id".short;
local function checkthread()
local thread, main = coroutine.running();
@@ -99,13 +100,14 @@ end
local empty_watchers = {};
local function runner(func, watchers, data)
return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
- queue = {}, watchers = watchers or empty_watchers, data = data }
+ queue = {}, watchers = watchers or empty_watchers, data = data, id = new_id() }
, runner_mt);
end
-- Add a task item for the runner to process
function runner_mt:run(input)
if input ~= nil then
+ self:log("debug", "queued new work item, %d items queued", #self.queue);
table.insert(self.queue, input);
end
if self.state ~= "ready" then
@@ -116,6 +118,7 @@ function runner_mt:run(input)
local q, thread = self.queue, self.thread;
if not thread or coroutine.status(thread) == "dead" then
+ self:log("debug", "creating new coroutine");
-- Create a new coroutine for this runner
thread = runner_create_thread(self.func, self);
self.thread = thread;
@@ -124,6 +127,7 @@ function runner_mt:run(input)
-- Process task item(s) while the queue is not empty, and we're not blocked
local n, state, err = #q, self.state, nil;
self.state = "running";
+ self:log("debug", "running main loop");
while n > 0 and state == "ready" do
local consumed;
-- Loop through queue items, and attempt to run them
@@ -156,6 +160,7 @@ function runner_mt:run(input)
-- Runner processed all items it can, so save current runner state
self.state = state;
if err or state ~= self.notified_state then
+ self:log("debug", "changed state from %s to %s", self.notified_state, err and "error" or state);
if err then
state = "error"
else
@@ -172,4 +177,8 @@ function runner_mt:enqueue(input)
table.insert(self.queue, input);
end
+function runner_mt:log(level, fmt, ...)
+ return log(level, "[runner %s] "..fmt, self.id, ...);
+end
+
return { waiter = waiter, guarder = guarder, runner = runner };