From 95c8ddb01ceb9cdc4352ddb6af163b02e59b1303 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Sun, 11 Aug 2013 14:46:07 +0100 Subject: util.async: New library to provide support around coroutine-based non-blocking functions --- util/async.lua | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 util/async.lua (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua new file mode 100644 index 00000000..9036a876 --- /dev/null +++ b/util/async.lua @@ -0,0 +1,115 @@ +local log = require "util.logger".init("util.async"); + +local function runner_continue(thread) + -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) + if coroutine.status(thread) ~= "suspended" then -- This should suffice + return false; + end + local ok, state, runner = coroutine.resume(thread); + if not ok then + local level = 0; + while debug.getinfo(thread, level, "") do level = level + 1; end + ok, runner = debug.getlocal(thread, level-1, 1); + local error_handler = runner.watchers.error; + if error_handler then error_handler(runner, debug.traceback(thread, state)); end + elseif state == "ready" then + -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. + -- We also have to :run(), because the queue might have further items that will not be + -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). + runner.state = "ready"; + runner:run(); + end + return true; +end + +function waiter(num) + local thread = coroutine.running(); + if not thread then + error("Not running in an async context, see http://prosody.im/doc/developers/async"); + end + num = num or 1; + return function () + coroutine.yield("wait"); + end, function () + num = num - 1; + if num == 0 then + if not runner_continue(thread) then + error("done() called without wait()!"); + end + end + end; +end + +local runner_mt = {}; +runner_mt.__index = runner_mt; + +local function runner_create_thread(func, self) + local thread = coroutine.create(function (self) + while true do + func(coroutine.yield("ready", self)); + end + end); + assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input + return thread; +end + +local empty_watchers = {}; +function runner(func, watchers, data) + return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", + queue = {}, watchers = watchers or empty_watchers, data = data } + , runner_mt); +end + +function runner_mt:run(input) + if input ~= nil then + table.insert(self.queue, input); + end + if self.state ~= "ready" then + return true, self.state, #self.queue; + end + + local q, thread = self.queue, self.thread; + if not thread or coroutine.status(thread) == "dead" then + thread = runner_create_thread(self.func, self); + self.thread = thread; + end + + local n, state, err = #q, self.state, nil; + self.state = "running"; + while n > 0 and state == "ready" do + local consumed; + for i = 1,n do + local input = q[i]; + local ok, new_state = coroutine.resume(thread, input); + if not ok then + consumed, state, err = i, "ready", debug.traceback(thread, new_state); + self.thread = nil; + break; + elseif state == "wait" then + consumed, state = i, "waiting"; + break; + end + end + if not consumed then consumed = n; end + if q[n+1] ~= nil then + n = #q; + end + for i = 1, n do + q[i] = q[consumed+i]; + end + n = #q; + end + self.state = state; + if state ~= self.notified_state then + self.notified_state = state; + local handler = self.watchers[state]; + if handler then handler(self, err); end + end + return true, state, n; +end + +function runner_mt:enqueue(input) + table.insert(self.queue, input); +end + +return { waiter = waiter, runner = runner }; -- cgit v1.2.3 From 056eec9953383a5a5418e1cbf8fb89c07fb0e903 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Mon, 12 Aug 2013 10:27:08 +0100 Subject: util.async: Make functions local --- util/async.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index 9036a876..59d3c6f2 100644 --- a/util/async.lua +++ b/util/async.lua @@ -22,7 +22,7 @@ local function runner_continue(thread) return true; end -function waiter(num) +local function waiter(num) local thread = coroutine.running(); if not thread then error("Not running in an async context, see http://prosody.im/doc/developers/async"); @@ -54,7 +54,7 @@ local function runner_create_thread(func, self) end local empty_watchers = {}; -function runner(func, watchers, data) +local function runner(func, watchers, data) return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", queue = {}, watchers = watchers or empty_watchers, data = data } , runner_mt); -- cgit v1.2.3 From d135b8fef982f97c0912444d0dc79688a2670322 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Mon, 12 Aug 2013 11:50:27 +0100 Subject: util.async: runner: Fix check for new state to recognise transition to 'waiting' --- util/async.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index 59d3c6f2..afbaba5c 100644 --- a/util/async.lua +++ b/util/async.lua @@ -85,7 +85,7 @@ function runner_mt:run(input) consumed, state, err = i, "ready", debug.traceback(thread, new_state); self.thread = nil; break; - elseif state == "wait" then + elseif new_state == "wait" then consumed, state = i, "waiting"; break; end -- cgit v1.2.3 From 516179867e7ffbe56343375aa0d8abcf41dee83a Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Mon, 12 Aug 2013 12:08:51 +0100 Subject: util.async: waiter: Remove restriction about wait() being called before done() --- util/async.lua | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index afbaba5c..c81f8639 100644 --- a/util/async.lua +++ b/util/async.lua @@ -28,14 +28,15 @@ local function waiter(num) error("Not running in an async context, see http://prosody.im/doc/developers/async"); end num = num or 1; + local waiting; return function () + if num == 0 then return; end -- already done + waiting = true; coroutine.yield("wait"); end, function () num = num - 1; - if num == 0 then - if not runner_continue(thread) then - error("done() called without wait()!"); - end + if num == 0 and waiting then + runner_continue(thread); end end; end -- cgit v1.2.3 From 0e702abb1f191bbf83bf3b24a99536fa4dc3dd73 Mon Sep 17 00:00:00 2001 From: Kim Alvefur Date: Mon, 12 Aug 2013 13:22:27 +0200 Subject: util.async: waiter: Throw error if done() called too many times --- util/async.lua | 2 ++ 1 file changed, 2 insertions(+) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index c81f8639..d0bc6c3d 100644 --- a/util/async.lua +++ b/util/async.lua @@ -37,6 +37,8 @@ local function waiter(num) num = num - 1; if num == 0 and waiting then runner_continue(thread); + elseif num < 0 then + error("done() called too many times"); end end; end -- cgit v1.2.3 From 7a082cb3938619202436656daf96fbeb18a50c22 Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Tue, 13 Aug 2013 19:23:00 +0100 Subject: util.async: Fix logic bug that prevented error watcher being called for runners --- util/async.lua | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index d0bc6c3d..8af8730f 100644 --- a/util/async.lua +++ b/util/async.lua @@ -103,8 +103,12 @@ function runner_mt:run(input) n = #q; end self.state = state; - if state ~= self.notified_state then - self.notified_state = state; + if err or state ~= self.notified_state then + if err then + state = "error" + else + self.notified_state = state; + end local handler = self.watchers[state]; if handler then handler(self, err); end end -- cgit v1.2.3 From 14f85f2f4e06a46fd46bd33bbfb23ad642ec01fd Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Tue, 13 Aug 2013 21:26:53 +0100 Subject: util.async: Add guarder method, to create guards to ensure only a single runner can pass through a section of code at a time --- util/async.lua | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index 8af8730f..32ed0542 100644 --- a/util/async.lua +++ b/util/async.lua @@ -43,6 +43,42 @@ local function waiter(num) end; end +function guarder() + local guards = {}; + return function (id, func) + local thread = coroutine.running(); + if not thread then + error("Not running in an async context, see http://prosody.im/doc/developers/async"); + end + local guard = guards[id]; + if not guard then + guard = {}; + guards[id] = guard; + log("debug", "New guard!"); + else + table.insert(guard, thread); + log("debug", "Guarded. %d threads waiting.", #guard) + coroutine.yield("wait"); + end + local function exit() + local next_waiting = table.remove(guard, 1); + if next_waiting then + log("debug", "guard: Executing next waiting thread (%d left)", #guard) + runner_continue(next_waiting); + else + log("debug", "Guard off duty.") + guards[id] = nil; + end + end + if func then + func(); + exit(); + return; + end + return exit; + end; +end + local runner_mt = {}; runner_mt.__index = runner_mt; @@ -119,4 +155,4 @@ function runner_mt:enqueue(input) table.insert(self.queue, input); end -return { waiter = waiter, runner = runner }; +return { waiter = waiter, guarder = guarder, runner = runner }; -- cgit v1.2.3 From 7f367c1d6eee23dfef56634f59b72f77f9b3a33c Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Tue, 13 Aug 2013 23:38:50 +0100 Subject: util.async: Make guarder() local --- util/async.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'util/async.lua') diff --git a/util/async.lua b/util/async.lua index 32ed0542..968ec804 100644 --- a/util/async.lua +++ b/util/async.lua @@ -43,7 +43,7 @@ local function waiter(num) end; end -function guarder() +local function guarder() local guards = {}; return function (id, func) local thread = coroutine.running(); -- cgit v1.2.3