local queue = require "util.queue"; local record lib -- T would typically be util.stanza record smqueue _queue : queue.queue _head : integer _tail : integer enum ack_errors "tail" "head" "pop" end push : function (smqueue, T) ack : function (smqueue, integer) : { T }, ack_errors resumable : function (smqueue) : boolean type consume_iter = function (smqueue) : T consume : function (smqueue) : consume_iter table : function (smqueue) : { T } end new : function (integer) : smqueue end local type smqueue = lib.smqueue; function smqueue:push(v) self._head = self._head + 1; -- Wraps instead of errors assert(self._queue:push(v)); end function smqueue:ack(h : integer) : { any }, smqueue.ack_errors if h < self._tail then return nil, "tail"; elseif h > self._head then return nil, "head"; end -- TODO optimize? cache table fields local acked = {}; self._tail = h; local expect = self._head - self._tail; while expect < self._queue:count() do local v = self._queue:pop(); if not v then return nil, "pop"; end table.insert(acked, v); end return acked; end function smqueue:count_unacked() : integer return self._head - self._tail; end function smqueue:count_acked() : integer return self._tail; end function smqueue:resumable() : boolean return self._queue:count() >= (self._head - self._tail); end function smqueue:resume() : queue.queue.iterator, any, integer return self._queue:items(); end function smqueue:consume() : queue.queue.consume_iter return self._queue:consume() end -- Compatibility wrapper, meant to look like a plain ol' array local record compat_mt _queue : smqueue end function compat_mt:__index(i : integer) : any if i < self._queue._tail then return nil end return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size]; end function compat_mt:__len() : integer return self._queue:count_unacked() end function smqueue:table() : { any } return setmetatable({ _queue = self }, compat_mt); end local function freeze(q : smqueue) : { string:integer } return { head = q._head, tail = q._tail } end local queue_mt = { -- __name = "smqueue"; __index = smqueue; __len = smqueue.count_unacked; __freeze = freeze; } function lib.new(size : integer) : queue.queue assert(size>0); return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt); end return lib;