aboutsummaryrefslogtreecommitdiffstats
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/indexedbheap.lua153
-rw-r--r--util/pubsub.lua9
-rw-r--r--util/timer.lua61
3 files changed, 221 insertions, 2 deletions
diff --git a/util/indexedbheap.lua b/util/indexedbheap.lua
new file mode 100644
index 00000000..3cb03037
--- /dev/null
+++ b/util/indexedbheap.lua
@@ -0,0 +1,153 @@
+
+local setmetatable = setmetatable;
+local math_floor = math.floor;
+local t_remove = table.remove;
+
+local function _heap_insert(self, item, sync, item2, index)
+ local pos = #self + 1;
+ while true do
+ local half_pos = math_floor(pos / 2);
+ if half_pos == 0 or item > self[half_pos] then break; end
+ self[pos] = self[half_pos];
+ sync[pos] = sync[half_pos];
+ index[sync[pos]] = pos;
+ pos = half_pos;
+ end
+ self[pos] = item;
+ sync[pos] = item2;
+ index[item2] = pos;
+end
+
+local function _percolate_up(self, k, sync, index)
+ local tmp = self[k];
+ local tmp_sync = sync[k];
+ while k ~= 1 do
+ local parent = math_floor(k/2);
+ if tmp < self[parent] then break; end
+ self[k] = self[parent];
+ sync[k] = sync[parent];
+ index[sync[k]] = k;
+ k = parent;
+ end
+ self[k] = tmp;
+ sync[k] = tmp_sync;
+ index[tmp_sync] = k;
+ return k;
+end
+
+local function _percolate_down(self, k, sync, index)
+ local tmp = self[k];
+ local tmp_sync = sync[k];
+ local size = #self;
+ local child = 2*k;
+ while 2*k <= size do
+ if child ~= size and self[child] > self[child + 1] then
+ child = child + 1;
+ end
+ if tmp > self[child] then
+ self[k] = self[child];
+ sync[k] = sync[child];
+ index[sync[k]] = k;
+ else
+ break;
+ end
+
+ k = child;
+ child = 2*k;
+ end
+ self[k] = tmp;
+ sync[k] = tmp_sync;
+ index[tmp_sync] = k;
+ return k;
+end
+
+local function _heap_pop(self, sync, index)
+ local size = #self;
+ if size == 0 then return nil; end
+
+ local result = self[1];
+ local result_sync = sync[1];
+ index[result_sync] = nil;
+ if size == 1 then
+ self[1] = nil;
+ sync[1] = nil;
+ return result, result_sync;
+ end
+ self[1] = t_remove(self);
+ sync[1] = t_remove(sync);
+ index[sync[1]] = 1;
+
+ _percolate_down(self, 1, sync, index);
+
+ return result, result_sync;
+end
+
+local indexed_heap = {};
+
+function indexed_heap:insert(item, priority, id)
+ if id == nil then
+ id = self.current_id;
+ self.current_id = id + 1;
+ end
+ self.items[id] = item;
+ _heap_insert(self.priorities, priority, self.ids, id, self.index);
+ return id;
+end
+function indexed_heap:pop()
+ local priority, id = _heap_pop(self.priorities, self.ids, self.index);
+ if id then
+ local item = self.items[id];
+ self.items[id] = nil;
+ return priority, item, id;
+ end
+end
+function indexed_heap:peek()
+ return self.priorities[1];
+end
+function indexed_heap:reprioritize(id, priority)
+ local k = self.index[id];
+ if k == nil then return; end
+ self.priorities[k] = priority;
+
+ k = _percolate_up(self.priorities, k, self.ids, self.index);
+ k = _percolate_down(self.priorities, k, self.ids, self.index);
+end
+function indexed_heap:remove_index(k)
+ local size = #self.priorities;
+
+ local result = self.priorities[k];
+ local result_sync = self.ids[k];
+ local item = self.items[result_sync];
+ if result == nil then return; end
+ self.index[result_sync] = nil;
+ self.items[result_sync] = nil;
+
+ self.priorities[k] = self.priorities[size];
+ self.ids[k] = self.ids[size];
+ self.index[self.ids[k]] = k;
+ t_remove(self.priorities);
+ t_remove(self.ids);
+
+ k = _percolate_up(self.priorities, k, self.ids, self.index);
+ k = _percolate_down(self.priorities, k, self.ids, self.index);
+
+ return result, item, result_sync;
+end
+function indexed_heap:remove(id)
+ return self:remove_index(self.index[id]);
+end
+
+local mt = { __index = indexed_heap };
+
+local _M = {
+ create = function()
+ return setmetatable({
+ ids = {}; -- heap of ids, sync'd with priorities
+ items = {}; -- map id->items
+ priorities = {}; -- heap of priorities
+ index = {}; -- map of id->index of id in ids
+ current_id = 1.5
+ }, mt);
+ end
+};
+return _M;
diff --git a/util/pubsub.lua b/util/pubsub.lua
index 0dfd196b..e0d428c0 100644
--- a/util/pubsub.lua
+++ b/util/pubsub.lua
@@ -258,6 +258,7 @@ function service:publish(node, actor, id, item)
end
node_obj = self.nodes[node];
end
+ node_obj.data[#node_obj.data + 1] = id;
node_obj.data[id] = item;
self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item });
self.config.broadcaster("items", node, node_obj.subscribers, item);
@@ -275,6 +276,12 @@ function service:retract(node, actor, id, retract)
return false, "item-not-found";
end
node_obj.data[id] = nil;
+ for i, _id in ipairs(node_obj.data) do
+ if id == _id then
+ table.remove(node_obj, i);
+ break;
+ end
+ end
if retract then
self.config.broadcaster("items", node, node_obj.subscribers, retract);
end
@@ -309,7 +316,7 @@ function service:get_items(node, actor, id)
return false, "item-not-found";
end
if id then -- Restrict results to a single specific item
- return true, { [id] = node_obj.data[id] };
+ return true, { id, [id] = node_obj.data[id] };
else
return true, node_obj.data;
end
diff --git a/util/timer.lua b/util/timer.lua
index 0e10e144..451e27d3 100644
--- a/util/timer.lua
+++ b/util/timer.lua
@@ -6,6 +6,8 @@
-- COPYING file in the source package for more information.
--
+local indexedbheap = require "util.indexedbheap";
+local log = require "util.logger".init("timer");
local server = require "net.server";
local math_min = math.min
local math_huge = math.huge
@@ -13,6 +15,9 @@ local get_time = require "socket".gettime;
local t_insert = table.insert;
local pairs = pairs;
local type = type;
+local debug_traceback = debug.traceback;
+local tostring = tostring;
+local xpcall = xpcall;
local data = {};
local new_data = {};
@@ -78,6 +83,60 @@ else
end
end
-add_task = _add_task;
+--add_task = _add_task;
+
+local h = indexedbheap.create();
+local params = {};
+local next_time = nil;
+local _id, _callback, _now, _param;
+local function _call() return _callback(_now, _id, _param); end
+local function _traceback_handler(err) log("error", "Traceback[timer]: %s", debug_traceback(tostring(err), 2)); end
+local function _on_timer(now)
+ local peek;
+ while true do
+ peek = h:peek();
+ if peek == nil or peek > now then break; end
+ local _;
+ _, _callback, _id = h:pop();
+ _now = now;
+ _param = params[_id];
+ params[_id] = nil;
+ --item(now, id, _param); -- FIXME pcall
+ local success, err = xpcall(_call, _traceback_handler);
+ if success and type(err) == "number" then
+ h:insert(_callback, err + now, _id); -- re-add
+ end
+ end
+ next_time = peek;
+ if peek ~= nil then
+ return peek - now;
+ end
+end
+function add_task(delay, callback, param)
+ local current_time = get_time();
+ local event_time = current_time + delay;
+
+ local id = h:insert(callback, event_time);
+ params[id] = param;
+ if next_time == nil or event_time < next_time then
+ next_time = event_time;
+ _add_task(next_time - current_time, _on_timer);
+ end
+ return id;
+end
+function stop(id)
+ params[id] = nil;
+ return h:remove(id);
+end
+function reschedule(id, delay)
+ local current_time = get_time();
+ local event_time = current_time + delay;
+ h:reprioritize(id, delay);
+ if next_time == nil or event_time < next_time then
+ next_time = event_time;
+ _add_task(next_time - current_time, _on_timer);
+ end
+ return id;
+end
return _M;