From 083fe317b7d5bd743f31b7b0a115f710105eda41 Mon Sep 17 00:00:00 2001 From: daurnimator Date: Tue, 18 Nov 2014 14:14:41 -0500 Subject: net.cqueues: Add module that allows use of cqueues while still using net.server as main loop --- net/cqueues.lua | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 net/cqueues.lua (limited to 'net/cqueues.lua') diff --git a/net/cqueues.lua b/net/cqueues.lua new file mode 100644 index 00000000..e82fe4ad --- /dev/null +++ b/net/cqueues.lua @@ -0,0 +1,65 @@ +-- Prosody IM +-- Copyright (C) 2014 Daurnimator +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- +-- This module allows you to use cqueues with a net.server mainloop +-- + +local server = require "net.server"; +local cqueues = require "cqueues"; + +-- Create a single top level cqueue +local cq; + +if server.cq then -- server provides cqueues object + cq = server.cq; +elseif server.get_backend() == "select" and server._addtimer then -- server_select + cq = cqueues.new(); + local function step() + assert(cq:loop(0)); + end + + -- Use wrapclient (as wrapconnection isn't exported) to get server_select to watch cq fd + local handler = server.wrapclient({ + getfd = function() return cq:pollfd(); end; + settimeout = function() end; -- Method just needs to exist + close = function() end; -- Need close method for 'closeall' + }, nil, nil, {}); + + -- Only need to listen for readable; cqueues handles everything under the hood + -- readbuffer is called when `select` notes an fd as readable + handler.readbuffer = step; + + -- Use server_select low lever timer facility, + -- this callback gets called *every* time there is a timeout in the main loop + server._addtimer(function(current_time) + -- This may end up in extra step()'s, but cqueues handles it for us. + step(); + return cq:timeout(); + end); +elseif server.event and server.base then -- server_event + cq = cqueues.new(); + -- Only need to listen for readable; cqueues handles everything under the hood + local EV_READ = server.event.EV_READ; + server.base:addevent(cq:pollfd(), EV_READ, function(e) + assert(cq:loop(0)); + -- Convert a cq timeout to an acceptable timeout for luaevent + local t = cq:timeout(); + if t == 0 then -- if you give luaevent 0, it won't call this callback again + t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`) + elseif t == nil then -- you always need to give a timeout, pick something big if we don't have one + t = 0x7FFFFFFF; -- largest 32bit int + end + return EV_READ, t; + end, + -- Schedule the callback to fire on first tick to ensure any cq:wrap calls that happen during start-up are serviced. + 0.000001); +else + error "NYI" +end + +return { + cq = cq; +} -- cgit v1.2.3 From e6ab467ef9f3f98a3f041ab6a525321acd2707ee Mon Sep 17 00:00:00 2001 From: daurnimator Date: Tue, 6 Jan 2015 20:01:59 -0500 Subject: net.cqueues: Add workaround for luaevent callback getting collected --- net/cqueues.lua | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'net/cqueues.lua') diff --git a/net/cqueues.lua b/net/cqueues.lua index e82fe4ad..a67e405a 100644 --- a/net/cqueues.lua +++ b/net/cqueues.lua @@ -43,7 +43,11 @@ elseif server.event and server.base then -- server_event cq = cqueues.new(); -- Only need to listen for readable; cqueues handles everything under the hood local EV_READ = server.event.EV_READ; - server.base:addevent(cq:pollfd(), EV_READ, function(e) + local event_handle; + event_handle = server.base:addevent(cq:pollfd(), EV_READ, function(e) + -- Need to reference event_handle or this callback will get collected + -- This creates a circular reference that can only be broken if event_handle is manually :close()'d + local _ = event_handle; assert(cq:loop(0)); -- Convert a cq timeout to an acceptable timeout for luaevent local t = cq:timeout(); -- cgit v1.2.3 From 85adf1b4d6f8015ff8e7bc672afaeabf3d9f2e3f Mon Sep 17 00:00:00 2001 From: daurnimator Date: Tue, 13 Jan 2015 18:36:00 -0500 Subject: net.cqueues: Fixes hardcoded timeout for first iteration This was originally put in place as a fix for what ended up a cqueues bug: https://github.com/wahern/cqueues/issues/40 A check for a cqueues version with the bug fix is included. --- net/cqueues.lua | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'net/cqueues.lua') diff --git a/net/cqueues.lua b/net/cqueues.lua index a67e405a..f6bfd949 100644 --- a/net/cqueues.lua +++ b/net/cqueues.lua @@ -9,6 +9,7 @@ local server = require "net.server"; local cqueues = require "cqueues"; +assert(cqueues.VERSION >= 20150112, "cqueues newer than 20151013 required") -- Create a single top level cqueue local cq; @@ -43,23 +44,27 @@ elseif server.event and server.base then -- server_event cq = cqueues.new(); -- Only need to listen for readable; cqueues handles everything under the hood local EV_READ = server.event.EV_READ; + -- Convert a cqueues timeout to an acceptable timeout for luaevent + local function luaevent_safe_timeout(cq) + local t = cq:timeout(); + -- if you give luaevent 0 or nil, it re-uses the previous timeout. + if t == 0 then + t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`) + elseif t == nil then -- pick something big if we don't have one + t = 0x7FFFFFFF; -- largest 32bit int + end + return t + end local event_handle; event_handle = server.base:addevent(cq:pollfd(), EV_READ, function(e) -- Need to reference event_handle or this callback will get collected -- This creates a circular reference that can only be broken if event_handle is manually :close()'d local _ = event_handle; + -- Run as many cqueues things as possible (with a timeout of 0) + -- If an error is thrown, it will break the libevent loop; but prosody resumes after logging a top level error assert(cq:loop(0)); - -- Convert a cq timeout to an acceptable timeout for luaevent - local t = cq:timeout(); - if t == 0 then -- if you give luaevent 0, it won't call this callback again - t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`) - elseif t == nil then -- you always need to give a timeout, pick something big if we don't have one - t = 0x7FFFFFFF; -- largest 32bit int - end - return EV_READ, t; - end, - -- Schedule the callback to fire on first tick to ensure any cq:wrap calls that happen during start-up are serviced. - 0.000001); + return EV_READ, luaevent_safe_timeout(cq); + end, luaevent_safe_timeout(cq)); else error "NYI" end -- cgit v1.2.3 From 700bd13b7735b21f4e303637e2211c5d4450a510 Mon Sep 17 00:00:00 2001 From: daurnimator Date: Fri, 16 Jan 2015 12:06:42 -0500 Subject: net.cqueues: Fix incorrect version check --- net/cqueues.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/cqueues.lua') diff --git a/net/cqueues.lua b/net/cqueues.lua index f6bfd949..8c4c756f 100644 --- a/net/cqueues.lua +++ b/net/cqueues.lua @@ -9,7 +9,7 @@ local server = require "net.server"; local cqueues = require "cqueues"; -assert(cqueues.VERSION >= 20150112, "cqueues newer than 20151013 required") +assert(cqueues.VERSION >= 20150113, "cqueues newer than 20150113 required") -- Create a single top level cqueue local cq; -- cgit v1.2.3