diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/server_epoll.lua | 34 | ||||
-rw-r--r-- | net/server_event.lua | 14 | ||||
-rw-r--r-- | net/server_select.lua | 65 | ||||
-rw-r--r-- | net/websocket/frames.lua | 2 |
4 files changed, 77 insertions, 38 deletions
diff --git a/net/server_epoll.lua b/net/server_epoll.lua index ecb72a00..13c8315a 100644 --- a/net/server_epoll.lua +++ b/net/server_epoll.lua @@ -120,9 +120,13 @@ local function runtimers(next_delay, min_wait) end local new_timeout = f(now); if new_timeout then - -- Schedule for 'delay' from the time actually scheduled, - -- not from now, in order to prevent timer drift. - timer[1] = t + new_timeout; + -- Schedule for 'delay' from the time actually scheduled, not from now, + -- in order to prevent timer drift, unless it already drifted way out of sync. + if (t + new_timeout) > ( now - new_timeout ) then + timer[1] = t + new_timeout; + else + timer[1] = now + new_timeout; + end resort_timers = true; else t_remove(timers, i); @@ -423,8 +427,10 @@ function interface:write(data) else self.writebuffer = { data }; end - self:setwritetimeout(); - self:set(nil, true); + if not self._write_lock then + self:setwritetimeout(); + self:set(nil, true); + end return #data; end interface.send = interface.write; @@ -604,11 +610,23 @@ function interface:pausefor(t) end); end +function interface:pause_writes() + self._write_lock = true; + self:setwritetimeout(false); + self:set(nil, false); +end + +function interface:resume_writes() + self._write_lock = nil; + if self.writebuffer[1] then + self:setwritetimeout(); + self:set(nil, true); + end +end + -- Connected! function interface:onconnect() - if self.conn and not self.peername and self.conn.getpeername then - self.peername, self.peerport = self.conn:getpeername(); - end + self:updatenames(); self.onconnect = noop; self:on("connect"); end diff --git a/net/server_event.lua b/net/server_event.lua index 11bd6a29..70757e03 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -253,6 +253,7 @@ end --TODO: Deprecate function interface_mt:lock_read(switch) + log("warn", ":lock_read is deprecated, use :pasue() and :resume()"); if switch then return self:pause(); else @@ -272,6 +273,19 @@ function interface_mt:resume() end end +function interface_mt:pause_writes() + return self:_lock(self.nointerface, self.noreading, true); +end + +function interface_mt:resume_writes() + self:_lock(self.nointerface, self.noreading, false); + if self.writecallback and not self.eventwrite then + self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ); -- register callback + return true; + end +end + + function interface_mt:counter(c) if c then self._connections = self._connections + c diff --git a/net/server_select.lua b/net/server_select.lua index 1a40a6d3..f616116e 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -424,9 +424,8 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport bufferlen = bufferlen + #data if bufferlen > maxsendlen then _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle - handler.write = idfalse -- don't write anymore return false - elseif socket and not _sendlist[ socket ] then + elseif not nosend and socket and not _sendlist[ socket ] then _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) end bufferqueuelen = bufferqueuelen + 1 @@ -456,49 +455,57 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport maxreadlen = readlen or maxreadlen return bufferlen, maxreadlen, maxsendlen end - --TODO: Deprecate handler.lock_read = function (self, switch) + out_error( "server.lua, lock_read() is deprecated, use pause() and resume()" ) if switch == true then - local tmp = _readlistlen - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _readtimes[ handler ] = nil - if _readlistlen ~= tmp then - noread = true - end + return self:pause() elseif switch == false then - if noread then - noread = false - _readlistlen = addsocket(_readlist, socket, _readlistlen) - _readtimes[ handler ] = _currenttime - end + return self:resume() end return noread end handler.pause = function (self) - return self:lock_read(true); + local tmp = _readlistlen + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _readtimes[ handler ] = nil + if _readlistlen ~= tmp then + noread = true + end + return noread; end handler.resume = function (self) - return self:lock_read(false); + if noread then + noread = false + _readlistlen = addsocket(_readlist, socket, _readlistlen) + _readtimes[ handler ] = _currenttime + end + return noread; end handler.lock = function( self, switch ) - handler.lock_read (switch) + out_error( "server.lua, lock() is deprecated" ) + handler.lock_read (self, switch) if switch == true then - handler.write = idfalse - local tmp = _sendlistlen - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _writetimes[ handler ] = nil - if _sendlistlen ~= tmp then - nosend = true - end + handler.pause_writes (self) elseif switch == false then - handler.write = write - if nosend then - nosend = false - write( "" ) - end + handler.resume_writes (self) end return noread, nosend end + handler.pause_writes = function (self) + local tmp = _sendlistlen + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _writetimes[ handler ] = nil + if _sendlistlen ~= tmp then + nosend = true + end + end + handler.resume_writes = function (self) + if nosend then + nosend = false + write( "" ) + end + end + local _readbuffer = function( ) -- this function reads data local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" if not err or (err == "wantread" or err == "timeout") then -- received something diff --git a/net/websocket/frames.lua b/net/websocket/frames.lua index ba25d261..b5aebb40 100644 --- a/net/websocket/frames.lua +++ b/net/websocket/frames.lua @@ -9,7 +9,7 @@ local softreq = require "util.dependencies".softreq; local random_bytes = require "util.random".bytes; -local bit = assert(softreq"bit" or softreq"bit32", +local bit = assert(softreq"bit32" or softreq"bit", "No bit module found. See https://prosody.im/doc/depends#bitop"); local band = bit.band; local bor = bit.bor; |