diff options
author | Matthew Wild <mwild1@gmail.com> | 2018-03-16 14:59:41 +0000 |
---|---|---|
committer | Matthew Wild <mwild1@gmail.com> | 2018-03-16 14:59:41 +0000 |
commit | 46a9b421420a80e448723a480a3b38128686d8a9 (patch) | |
tree | 0cd85207f6220ee571548d3770525317e8001048 /util/async.lua | |
parent | d0a4392817106074a0470a6039f024baa593c748 (diff) | |
download | prosody-46a9b421420a80e448723a480a3b38128686d8a9.tar.gz prosody-46a9b421420a80e448723a480a3b38128686d8a9.zip |
util.async: Add per-runner ids and add runner:log() method
Diffstat (limited to 'util/async.lua')
-rw-r--r-- | util/async.lua | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/util/async.lua b/util/async.lua index 992797b8..3934691b 100644 --- a/util/async.lua +++ b/util/async.lua @@ -1,4 +1,5 @@ local log = require "util.logger".init("util.async"); +local new_id = require "util.id".short; local function checkthread() local thread, main = coroutine.running(); @@ -99,13 +100,14 @@ end local empty_watchers = {}; local function runner(func, watchers, data) return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", - queue = {}, watchers = watchers or empty_watchers, data = data } + queue = {}, watchers = watchers or empty_watchers, data = data, id = new_id() } , runner_mt); end -- Add a task item for the runner to process function runner_mt:run(input) if input ~= nil then + self:log("debug", "queued new work item, %d items queued", #self.queue); table.insert(self.queue, input); end if self.state ~= "ready" then @@ -116,6 +118,7 @@ function runner_mt:run(input) local q, thread = self.queue, self.thread; if not thread or coroutine.status(thread) == "dead" then + self:log("debug", "creating new coroutine"); -- Create a new coroutine for this runner thread = runner_create_thread(self.func, self); self.thread = thread; @@ -124,6 +127,7 @@ function runner_mt:run(input) -- Process task item(s) while the queue is not empty, and we're not blocked local n, state, err = #q, self.state, nil; self.state = "running"; + self:log("debug", "running main loop"); while n > 0 and state == "ready" do local consumed; -- Loop through queue items, and attempt to run them @@ -156,6 +160,7 @@ function runner_mt:run(input) -- Runner processed all items it can, so save current runner state self.state = state; if err or state ~= self.notified_state then + self:log("debug", "changed state from %s to %s", self.notified_state, err and "error" or state); if err then state = "error" else @@ -172,4 +177,8 @@ function runner_mt:enqueue(input) table.insert(self.queue, input); end +function runner_mt:log(level, fmt, ...) + return log(level, "[runner %s] "..fmt, self.id, ...); +end + return { waiter = waiter, guarder = guarder, runner = runner }; |