aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/cqueues.lua65
-rw-r--r--net/http.lua17
-rw-r--r--net/server.lua106
-rw-r--r--net/server_event.lua96
-rw-r--r--net/server_select.lua101
5 files changed, 274 insertions, 111 deletions
diff --git a/net/cqueues.lua b/net/cqueues.lua
new file mode 100644
index 00000000..e82fe4ad
--- /dev/null
+++ b/net/cqueues.lua
@@ -0,0 +1,65 @@
+-- Prosody IM
+-- Copyright (C) 2014 Daurnimator
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+-- This module allows you to use cqueues with a net.server mainloop
+--
+
+local server = require "net.server";
+local cqueues = require "cqueues";
+
+-- Create a single top level cqueue
+local cq;
+
+if server.cq then -- server provides cqueues object
+ cq = server.cq;
+elseif server.get_backend() == "select" and server._addtimer then -- server_select
+ cq = cqueues.new();
+ local function step()
+ assert(cq:loop(0));
+ end
+
+ -- Use wrapclient (as wrapconnection isn't exported) to get server_select to watch cq fd
+ local handler = server.wrapclient({
+ getfd = function() return cq:pollfd(); end;
+ settimeout = function() end; -- Method just needs to exist
+ close = function() end; -- Need close method for 'closeall'
+ }, nil, nil, {});
+
+ -- Only need to listen for readable; cqueues handles everything under the hood
+ -- readbuffer is called when `select` notes an fd as readable
+ handler.readbuffer = step;
+
+ -- Use server_select low lever timer facility,
+ -- this callback gets called *every* time there is a timeout in the main loop
+ server._addtimer(function(current_time)
+ -- This may end up in extra step()'s, but cqueues handles it for us.
+ step();
+ return cq:timeout();
+ end);
+elseif server.event and server.base then -- server_event
+ cq = cqueues.new();
+ -- Only need to listen for readable; cqueues handles everything under the hood
+ local EV_READ = server.event.EV_READ;
+ server.base:addevent(cq:pollfd(), EV_READ, function(e)
+ assert(cq:loop(0));
+ -- Convert a cq timeout to an acceptable timeout for luaevent
+ local t = cq:timeout();
+ if t == 0 then -- if you give luaevent 0, it won't call this callback again
+ t = 0.000001; -- 1 microsecond is the smallest that works (goes into a `struct timeval`)
+ elseif t == nil then -- you always need to give a timeout, pick something big if we don't have one
+ t = 0x7FFFFFFF; -- largest 32bit int
+ end
+ return EV_READ, t;
+ end,
+ -- Schedule the callback to fire on first tick to ensure any cq:wrap calls that happen during start-up are serviced.
+ 0.000001);
+else
+ error "NYI"
+end
+
+return {
+ cq = cq;
+}
diff --git a/net/http.lua b/net/http.lua
index e6bf0018..357b7452 100644
--- a/net/http.lua
+++ b/net/http.lua
@@ -6,7 +6,6 @@
-- COPYING file in the source package for more information.
--
-local socket = require "socket"
local b64 = require "util.encodings".base64.encode;
local url = require "socket.url"
local httpstream_new = require "net.http.parser".new;
@@ -164,21 +163,17 @@ function request(u, ex, callback)
end
local port_number = port and tonumber(port) or (using_https and 443 or 80);
- -- Connect the socket, and wrap it with net.server
- local conn = socket.tcp();
- conn:settimeout(10);
- local ok, err = conn:connect(host, port_number);
- if not ok and err ~= "timeout" then
- callback(nil, 0, req);
- return nil, err;
- end
-
local sslctx = false;
if using_https then
sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2", "no_sslv3" } };
end
- req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx));
+ local handler, conn = server.addclient(host, port_number, listener, "*a", sslctx)
+ if not handler then
+ callback(nil, 0, req);
+ return nil, conn;
+ end
+ req.handler, req.conn = handler, conn
req.write = function (...) return req.handler:write(...); end
req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end
diff --git a/net/server.lua b/net/server.lua
index 2a0b89ae..a753a19c 100644
--- a/net/server.lua
+++ b/net/server.lua
@@ -6,25 +6,69 @@
-- COPYING file in the source package for more information.
--
-local use_luaevent = prosody and require "core.configmanager".get("*", "use_libevent");
+local server_type = prosody and require "core.configmanager".get("*", "network_backend") or "select";
+if prosody and require "core.configmanager".get("*", "use_libevent") then
+ server_type = "event";
+end
-if use_luaevent then
- use_luaevent = pcall(require, "luaevent.core");
- if not use_luaevent then
+if server_type == "event" then
+ if not pcall(require, "luaevent.core") then
log("error", "libevent not found, falling back to select()");
+ server_type = "select"
end
end
local server;
-
-if use_luaevent then
+local set_config;
+if server_type == "event" then
server = require "net.server_event";
- -- Overwrite signal.signal() because we need to ask libevent to
- -- handle them instead
- local ok, signal = pcall(require, "util.signal");
- if ok and signal then
- local _signal_signal = signal.signal;
+ local defaults = {};
+ for k,v in pairs(server.cfg) do
+ defaults[k] = v;
+ end
+ function set_config(settings)
+ local event_settings = {
+ ACCEPT_DELAY = settings.event_accept_retry_interval;
+ ACCEPT_QUEUE = settings.tcp_backlog;
+ CLEAR_DELAY = settings.event_clear_interval;
+ CONNECT_TIMEOUT = settings.connect_timeout;
+ DEBUG = settings.debug;
+ HANDSHAKE_TIMEOUT = settings.ssl_handshake_timeout;
+ MAX_CONNECTIONS = settings.max_connections;
+ MAX_HANDSHAKE_ATTEMPTS = settings.max_ssl_handshake_roundtrips;
+ MAX_READ_LENGTH = settings.max_receive_buffer_size;
+ MAX_SEND_LENGTH = settings.max_send_buffer_size;
+ READ_TIMEOUT = settings.read_timeout;
+ WRITE_TIMEOUT = settings.send_timeout;
+ };
+
+ for k,default in pairs(defaults) do
+ server.cfg[k] = event_settings[k] or default;
+ end
+ end
+elseif server_type == "select" then
+ server = require "net.server_select";
+
+ local defaults = {};
+ for k,v in pairs(server.getsettings()) do
+ defaults[k] = v;
+ end
+ function set_config(settings)
+ local select_settings = {};
+ for k,default in pairs(defaults) do
+ select_settings[k] = settings[k] or default;
+ end
+ server.changesettings(select_settings);
+ end
+else
+ error("Unsupported server type")
+end
+
+-- If server.hook_signal exists, replace signal.signal()
+local has_signal, signal = pcall(require, "util.signal");
+if has_signal then
+ if server.hook_signal then
function signal.signal(signal_id, handler)
if type(signal_id) == "string" then
signal_id = signal[signal_id:upper()];
@@ -34,46 +78,22 @@ if use_luaevent then
end
return server.hook_signal(signal_id, handler);
end
+ else
+ server.hook_signal = signal.signal;
end
else
- use_luaevent = false;
- server = require "net.server_select";
+ if not server.hook_signal then
+ server.hook_signal = function()
+ return false, "signal hooking not supported"
+ end
+ end
end
if prosody then
local config_get = require "core.configmanager".get;
- local defaults = {};
- for k,v in pairs(server.cfg or server.getsettings()) do
- defaults[k] = v;
- end
local function load_config()
local settings = config_get("*", "network_settings") or {};
- if use_luaevent then
- local event_settings = {
- ACCEPT_DELAY = settings.event_accept_retry_interval;
- ACCEPT_QUEUE = settings.tcp_backlog;
- CLEAR_DELAY = settings.event_clear_interval;
- CONNECT_TIMEOUT = settings.connect_timeout;
- DEBUG = settings.debug;
- HANDSHAKE_TIMEOUT = settings.ssl_handshake_timeout;
- MAX_CONNECTIONS = settings.max_connections;
- MAX_HANDSHAKE_ATTEMPTS = settings.max_ssl_handshake_roundtrips;
- MAX_READ_LENGTH = settings.max_receive_buffer_size;
- MAX_SEND_LENGTH = settings.max_send_buffer_size;
- READ_TIMEOUT = settings.read_timeout;
- WRITE_TIMEOUT = settings.send_timeout;
- };
-
- for k,default in pairs(defaults) do
- server.cfg[k] = event_settings[k] or default;
- end
- else
- local select_settings = {};
- for k,default in pairs(defaults) do
- select_settings[k] = settings[k] or default;
- end
- server.changesettings(select_settings);
- end
+ return set_config(settings);
end
load_config();
prosody.events.add_handler("config-reloaded", load_config);
diff --git a/net/server_event.lua b/net/server_event.lua
index 756e9837..fa6dda19 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -44,8 +44,9 @@ local setmetatable = use "setmetatable"
local t_insert = table.insert
local t_concat = table.concat
-local ssl = use "ssl"
+local has_luasec, ssl = pcall ( require , "ssl" )
local socket = use "socket" or require "socket"
+local getaddrinfo = socket.dns.getaddrinfo
local log = require ("util.logger").init("socket")
@@ -128,7 +129,7 @@ do
return self:_destroy();
end
- function interface_mt:_start_connection(plainssl) -- should be called from addclient
+ function interface_mt:_start_connection(plainssl) -- called from wrapclient
local callback = function( event )
if EV_TIMEOUT == event then -- timeout during connection
self.fatalerror = "connection timeout"
@@ -136,7 +137,7 @@ do
self:_close()
debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
else
- if plainssl and ssl then -- start ssl session
+ if plainssl and has_luasec then -- start ssl session
self:starttls(self._sslctx, true)
else -- normal connection
self:_start_session(true)
@@ -512,7 +513,7 @@ do
_sslctx = sslctx; -- parameters
_usingssl = false; -- client is using ssl;
}
- if not ssl then interface.starttls = false; end
+ if not has_luasec then interface.starttls = false; end
interface.id = tostring(interface):match("%x+$");
interface.writecallback = function( event ) -- called on write events
--vdebug( "new client write event, id/ip/port:", interface, ip, port )
@@ -695,7 +696,7 @@ do
interface._connections = interface._connections + 1 -- increase connection count
local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
--vdebug( "client id:", clientinterface, "startssl:", startssl )
- if ssl and sslctx then
+ if has_luasec and sslctx then
clientinterface:starttls(sslctx, true)
else
clientinterface:_start_session( true )
@@ -716,25 +717,17 @@ do
end
local addserver = ( function( )
- return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments
- --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil")
+ return function( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments
+ --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil")
+ if sslctx and not has_luasec then
+ debug "fatal error: luasec not found"
+ return nil, "luasec not found"
+ end
local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
if not server then
debug( "creating server socket on "..addr.." port "..port.." failed:", err )
return nil, err
end
- local sslctx
- if sslcfg then
- if not ssl then
- debug "fatal error: luasec not found"
- return nil, "luasec not found"
- end
- sslctx, err = sslcfg
- if err then
- debug( "error while creating new ssl context for server socket:", err )
- return nil, err
- end
- end
local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
debug( "new server created with id:", tostring(interface))
return interface
@@ -750,37 +743,34 @@ do
--function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
end
- function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
- local client, err = socket.tcp() -- creating new socket
+ function addclient( addr, serverport, listener, pattern, sslctx, typ )
+ if sslctx and not has_luasec then
+ debug "need luasec, but not available"
+ return nil, "luasec not found"
+ end
+ if getaddrinfo and not typ then
+ local addrinfo, err = getaddrinfo(addr)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
+ end
+ end
+ local create = socket[typ or "tcp"]
+ if type( create ) ~= "function" then
+ return nil, "invalid socket type"
+ end
+ local client, err = create() -- creating new socket
if not client then
debug( "cannot create socket:", err )
return nil, err
end
client:settimeout( 0 ) -- set nonblocking
- if localaddr then
- local res, err = client:bind( localaddr, localport, -1 )
- if not res then
- debug( "cannot bind client:", err )
- return nil, err
- end
- end
- local sslctx
- if sslcfg then -- handle ssl/new context
- if not ssl then
- debug "need luasec, but not available"
- return nil, "luasec not found"
- end
- sslctx, err = sslcfg
- if err then
- debug( "cannot create new ssl context:", err )
- return nil, err
- end
- end
local res, err = client:connect( addr, serverport ) -- connect
- if res or ( err == "timeout" ) then
- local ip, port = client:getsockname( )
- local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
- interface:_start_connection( startssl )
+ if res or ( err == "timeout" or err == "Operation already in progress" ) then
+ if client.getsockname then
+ addr = client:getsockname( )
+ end
+ local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx )
debug( "new connection id:", interface.id )
return interface, err
else
@@ -858,6 +848,23 @@ local function link(sender, receiver, buffersize)
sender:set_mode("*a");
end
+local add_task do
+ local EVENT_LEAVE = (event.core and event.core.LEAVE) or -1;
+ local socket_gettime = socket.gettime
+ function add_task(delay, callback)
+ local event_handle;
+ event_handle = base:addevent(nil, 0, function ()
+ local ret = callback(socket_gettime());
+ if ret then
+ return 0, ret;
+ elseif event_handle then
+ return EVENT_LEAVE;
+ end
+ end
+ , delay);
+ end
+end
+
return {
cfg = cfg,
@@ -874,6 +881,7 @@ return {
closeall = closeallservers,
get_backend = get_backend,
hook_signal = hook_signal,
+ add_task = add_task,
__NAME = SCRIPT_NAME,
__DATE = LAST_MODIFIED,
diff --git a/net/server_select.lua b/net/server_select.lua
index 486e953b..d8404001 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -42,19 +42,21 @@ local os_difftime = os.difftime
local math_min = math.min
local math_huge = math.huge
local table_concat = table.concat
+local table_insert = table.insert
local string_sub = string.sub
local coroutine_wrap = coroutine.wrap
local coroutine_yield = coroutine.yield
--// extern libs //--
-local luasec = use "ssl"
+local has_luasec, luasec = pcall ( require , "ssl" )
local luasocket = use "socket" or require "socket"
local luasocket_gettime = luasocket.gettime
+local getaddrinfo = luasocket.dns.getaddrinfo
--// extern lib methods //--
-local ssl_wrap = ( luasec and luasec.wrap )
+local ssl_wrap = ( has_luasec and luasec.wrap )
local socket_bind = luasocket.bind
local socket_sleep = luasocket.sleep
local socket_select = luasocket.select
@@ -594,7 +596,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
end
)
end
- if luasec then
+ if has_luasec then
handler.starttls = function( self, _sslctx)
if _sslctx then
handler:set_sslctx(_sslctx);
@@ -647,7 +649,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
- if sslctx and luasec then
+ if sslctx and has_luasec then
out_put "server.lua: auto-starting ssl negotiation..."
handler.autostart_ssl = true;
local ok, err = handler:starttls(sslctx);
@@ -723,22 +725,23 @@ end
----------------------------------// PUBLIC //--
addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server
+ addr = addr or "*"
local err
if type( listeners ) ~= "table" then
err = "invalid listener table"
- end
- if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ elseif type ( addr ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
err = "invalid port"
elseif _server[ addr..":"..port ] then
err = "listeners on '[" .. addr .. "]:" .. port .. "' already exist"
- elseif sslctx and not luasec then
+ elseif sslctx and not has_luasec then
err = "luasec not found"
end
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
return nil, err
end
- addr = addr or "*"
local server, err = socket_bind( addr, port, _tcpbacklog )
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
@@ -830,6 +833,50 @@ addtimer = function( listener )
return true
end
+local add_task do
+ local data = {};
+ local new_data = {};
+
+ function add_task(delay, callback)
+ local current_time = luasocket_gettime();
+ delay = delay + current_time;
+ if delay >= current_time then
+ table_insert(new_data, {delay, callback});
+ else
+ local r = callback(current_time);
+ if r and type(r) == "number" then
+ return add_task(r, callback);
+ end
+ end
+ end
+
+ addtimer(function()
+ local current_time = luasocket_gettime();
+ if #new_data > 0 then
+ for _, d in pairs(new_data) do
+ table_insert(data, d);
+ end
+ new_data = {};
+ end
+
+ local next_time = math_huge;
+ for i, d in pairs(data) do
+ local t, callback = d[1], d[2];
+ if t <= current_time then
+ data[i] = nil;
+ local r = callback(current_time);
+ if type(r) == "number" then
+ add_task(r, callback);
+ next_time = math_min(next_time, r);
+ end
+ else
+ next_time = math_min(next_time, t - current_time);
+ end
+ end
+ return next_time;
+ end);
+end
+
stats = function( )
return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen
end
@@ -941,17 +988,44 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx
return handler, socket
end
-local addclient = function( address, port, listeners, pattern, sslctx )
- local client, err = luasocket.tcp( )
+local addclient = function( address, port, listeners, pattern, sslctx, typ )
+ local err
+ if type( listeners ) ~= "table" then
+ err = "invalid listener table"
+ elseif type ( address ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ err = "invalid port"
+ elseif sslctx and not has_luasec then
+ err = "luasec not found"
+ end
+ if getaddrinfo and not typ then
+ local addrinfo, err = getaddrinfo(address)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
+ end
+ end
+ local create = luasocket[typ or "tcp"]
+ if type( create ) ~= "function" then
+ err = "invalid socket type"
+ end
+
+ if err then
+ out_error( "server.lua, addclient: ", err )
+ return nil, err
+ end
+
+ local client, err = create( )
if err then
return nil, err
end
client:settimeout( 0 )
- _, err = client:connect( address, port )
- if err then -- try again
+ local ok, err = client:connect( address, port )
+ if ok or err == "timeout" or err == "Operation already in progress" then
return wrapclient( client, address, port, listeners, pattern, sslctx )
else
- return wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
+ return nil, err
end
end
@@ -978,6 +1052,7 @@ end
return {
_addtimer = addtimer,
+ add_task = add_task;
addclient = addclient,
wrapclient = wrapclient,