aboutsummaryrefslogtreecommitdiffstats
path: root/util/async.lua
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2016-05-22 20:06:12 +0100
committerMatthew Wild <mwild1@gmail.com>2016-05-22 20:06:12 +0100
commite34fadb697fe8ab8e9b1542e0c794cb99bd5dbe8 (patch)
treea417d6541e236f6064f8e1467bed6135fd2494af /util/async.lua
parent0ea8db94300bd49be5cd9528302f28609791858f (diff)
downloadprosody-e34fadb697fe8ab8e9b1542e0c794cb99bd5dbe8.tar.gz
prosody-e34fadb697fe8ab8e9b1542e0c794cb99bd5dbe8.zip
util.async: Add some more comments for clarity
Diffstat (limited to 'util/async.lua')
-rw-r--r--util/async.lua15
1 files changed, 15 insertions, 0 deletions
diff --git a/util/async.lua b/util/async.lua
index 7b2eae54..76da45fa 100644
--- a/util/async.lua
+++ b/util/async.lua
@@ -7,6 +7,8 @@ local function runner_continue(thread)
end
local ok, state, runner = coroutine.resume(thread);
if not ok then
+ -- Running the coroutine failed, which means we have to find the runner manually,
+ -- in order to inform the error handler
local level = 0;
while debug.getinfo(thread, level, "") do level = level + 1; end
ok, runner = debug.getlocal(thread, level-1, 1);
@@ -99,37 +101,48 @@ local function runner(func, watchers, data)
, runner_mt);
end
+-- Add a task item for the runner to process
function runner_mt:run(input)
if input ~= nil then
table.insert(self.queue, input);
end
if self.state ~= "ready" then
+ -- The runner is busy. Indicate that the task item has been
+ -- queued, and return information about the current runner state
return true, self.state, #self.queue;
end
local q, thread = self.queue, self.thread;
if not thread or coroutine.status(thread) == "dead" then
+ -- Create a new coroutine for this runner
thread = runner_create_thread(self.func, self);
self.thread = thread;
end
+ -- Process task item(s) while the queue is not empty, and we're not blocked
local n, state, err = #q, self.state, nil;
self.state = "running";
while n > 0 and state == "ready" do
local consumed;
+ -- Loop through queue items, and attempt to run them
for i = 1,n do
local input = q[i];
local ok, new_state = coroutine.resume(thread, input);
if not ok then
+ -- There was an error running the coroutine, save the error, mark runner as ready to begin again
consumed, state, err = i, "ready", debug.traceback(thread, new_state);
self.thread = nil;
break;
elseif new_state == "wait" then
+ -- Runner is blocked on waiting for a task item to complete
consumed, state = i, "waiting";
break;
end
end
+ -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
+ -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
if not consumed then consumed = n; end
+ -- Remove consumed items from the queue array
if q[n+1] ~= nil then
n = #q;
end
@@ -138,6 +151,7 @@ function runner_mt:run(input)
end
n = #q;
end
+ -- Runner processed all items it can, so save current runner state
self.state = state;
if err or state ~= self.notified_state then
if err then
@@ -151,6 +165,7 @@ function runner_mt:run(input)
return true, state, n;
end
+-- Add a task item to the queue without invoking the runner, even if it is idle
function runner_mt:enqueue(input)
table.insert(self.queue, input);
end