aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/server_epoll.lua77
1 files changed, 29 insertions, 48 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua
index c279c579..0c03ae15 100644
--- a/net/server_epoll.lua
+++ b/net/server_epoll.lua
@@ -6,9 +6,7 @@
--
-local t_sort = table.sort;
local t_insert = table.insert;
-local t_remove = table.remove;
local t_concat = table.concat;
local setmetatable = setmetatable;
local tostring = tostring;
@@ -20,6 +18,7 @@ local log = require "util.logger".init("server_epoll");
local socket = require "socket";
local luasec = require "ssl";
local gettime = require "util.time".now;
+local indexedbheap = require "util.indexedbheap";
local createtable = require "util.table".create;
local inet = require "util.net";
local inet_pton = inet.pton;
@@ -66,22 +65,24 @@ local fds = createtable(10, 0); -- FD -> conn
-- Timer and scheduling --
-local timers = {};
+local timers = indexedbheap.create();
local function noop() end
local function closetimer(t)
t[1] = 0;
t[2] = noop;
+ timers:remove(t.id);
end
--- Set to true when timers have changed
-local resort_timers = false;
+local function reschedule(t, time)
+ t[1] = time;
+ timers:reprioritize(t.id, time);
+end
-- Add absolute timer
local function at(time, f)
- local timer = { time, f, close = closetimer };
- t_insert(timers, timer);
- resort_timers = true;
+ local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
+ timer.id = timers:insert(timer, time);
return timer;
end
@@ -94,50 +95,32 @@ end
-- Return time until next timeout
local function runtimers(next_delay, min_wait)
-- Any timers at all?
- if not timers[1] then
- return next_delay;
+ local now = gettime();
+ local peek = timers:peek();
+ while peek do
+
+ if peek > now then
+ next_delay = peek - now;
+ break;
end
- if resort_timers then
- -- Sort earliest timers to the end
- t_sort(timers, function (a, b) return a[1] > b[1]; end);
- resort_timers = false;
+ local _, timer, id = timers:pop();
+ local ok, ret = pcall(timer[2], now);
+ if ok and type(ret) == "number" then
+ local next_time = now+ret;
+ timer[1] = next_time;
+ timers:insert(timer, next_time);
end
- -- Iterate from the end and remove completed timers
- for i = #timers, 1, -1 do
- local timer = timers[i];
- local t, f = timer[1], timer[2];
- -- Get time for every iteration to increase accuracy
- local now = gettime();
- if t > now then
- -- This timer should not fire yet
- local diff = t - now;
- if diff < next_delay then
- next_delay = diff;
+ peek = timers:peek();
end
- break;
- end
- local new_timeout = f(now);
- if new_timeout then
- -- Schedule for 'delay' from the time actually scheduled,
- -- not from now, in order to prevent timer drift.
- timer[1] = t + new_timeout;
- resort_timers = true;
- else
- t_remove(timers, i);
- end
+ if peek == nil then
+ return next_delay;
end
- if resort_timers or next_delay < min_wait then
- -- Timers may be added from within a timer callback.
- -- Those would not be considered for next_delay,
- -- and we might sleep for too long, so instead
- -- we return a shorter timeout so we can
- -- properly sort all new timers.
- next_delay = min_wait;
+ if next_delay < min_wait then
+ return min_wait;
end
-
return next_delay;
end
@@ -243,8 +226,7 @@ function interface:setreadtimeout(t)
end
t = t or cfg.read_timeout;
if self._readtimeout then
- self._readtimeout[1] = gettime() + t;
- resort_timers = true;
+ self._readtimeout:reschedule(gettime() + t);
else
self._readtimeout = addtimer(t, function ()
if self:on("readtimeout") then
@@ -268,8 +250,7 @@ function interface:setwritetimeout(t)
end
t = t or cfg.send_timeout;
if self._writetimeout then
- self._writetimeout[1] = gettime() + t;
- resort_timers = true;
+ self._writetimeout:reschedule(gettime() + t);
else
self._writetimeout = addtimer(t, function ()
self:on("disconnect", "write timeout");