aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKim Alvefur <zash@zash.se>2016-01-22 15:03:02 +0100
committerKim Alvefur <zash@zash.se>2016-01-22 15:03:02 +0100
commit931549f781d8a0cb98af05b98cf00880dca5fb9f (patch)
tree6826436cb243da87ede7abff06607216fa25fa4b
parentb0864f247d26d45d1a2e6f0dc4958254ca8bcb47 (diff)
parent6116ba8cfbeff29bd3c4cedd1b05babec47e1507 (diff)
downloadprosody-931549f781d8a0cb98af05b98cf00880dca5fb9f.tar.gz
prosody-931549f781d8a0cb98af05b98cf00880dca5fb9f.zip
Merge 0.10->trunk
-rw-r--r--net/dns.lua14
-rw-r--r--net/server.lua44
-rw-r--r--net/server_select.lua22
-rw-r--r--plugins/mod_s2s/s2sout.lib.lua46
4 files changed, 72 insertions, 54 deletions
diff --git a/net/dns.lua b/net/dns.lua
index d711af34..b047ec54 100644
--- a/net/dns.lua
+++ b/net/dns.lua
@@ -754,17 +754,17 @@ function resolver:query(qname, qtype, qclass) -- - - - - - - - - - -- query
self.active[id] = self.active[id] or {};
self.active[id][question] = o;
- -- remember which coroutine wants the answer
- if co then
- set(self.wanted, qclass, qtype, qname, co, true);
- end
-
local conn, err = self:getsocket(o.server)
if not conn then
return nil, err;
end
conn:send (o.packet)
+ -- remember which coroutine wants the answer
+ if co then
+ set(self.wanted, qclass, qtype, qname, co, true);
+ end
+
if timer and self.timeout then
local num_servers = #self.server;
local i = 1;
@@ -861,7 +861,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
-- retire the query
local queries = self.active[response.header.id];
queries[response.question.raw] = nil;
-
+
if not next(queries) then self.active[response.header.id] = nil; end
if not next(self.active) then self:closeall(); end
@@ -875,7 +875,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
set(self.wanted, q.class, q.type, q.name, nil);
end
end
-
+
end
end
end
diff --git a/net/server.lua b/net/server.lua
index a753a19c..0e13399d 100644
--- a/net/server.lua
+++ b/net/server.lua
@@ -28,24 +28,24 @@ if server_type == "event" then
defaults[k] = v;
end
function set_config(settings)
- local event_settings = {
- ACCEPT_DELAY = settings.event_accept_retry_interval;
- ACCEPT_QUEUE = settings.tcp_backlog;
- CLEAR_DELAY = settings.event_clear_interval;
- CONNECT_TIMEOUT = settings.connect_timeout;
- DEBUG = settings.debug;
- HANDSHAKE_TIMEOUT = settings.ssl_handshake_timeout;
- MAX_CONNECTIONS = settings.max_connections;
- MAX_HANDSHAKE_ATTEMPTS = settings.max_ssl_handshake_roundtrips;
- MAX_READ_LENGTH = settings.max_receive_buffer_size;
- MAX_SEND_LENGTH = settings.max_send_buffer_size;
- READ_TIMEOUT = settings.read_timeout;
- WRITE_TIMEOUT = settings.send_timeout;
- };
+ local event_settings = {
+ ACCEPT_DELAY = settings.accept_retry_interval;
+ ACCEPT_QUEUE = settings.tcp_backlog;
+ CLEAR_DELAY = settings.event_clear_interval;
+ CONNECT_TIMEOUT = settings.connect_timeout;
+ DEBUG = settings.debug;
+ HANDSHAKE_TIMEOUT = settings.ssl_handshake_timeout;
+ MAX_CONNECTIONS = settings.max_connections;
+ MAX_HANDSHAKE_ATTEMPTS = settings.max_ssl_handshake_roundtrips;
+ MAX_READ_LENGTH = settings.max_receive_buffer_size;
+ MAX_SEND_LENGTH = settings.max_send_buffer_size;
+ READ_TIMEOUT = settings.read_timeout;
+ WRITE_TIMEOUT = settings.send_timeout;
+ };
- for k,default in pairs(defaults) do
- server.cfg[k] = event_settings[k] or default;
- end
+ for k,default in pairs(defaults) do
+ server.cfg[k] = event_settings[k] or default;
+ end
end
elseif server_type == "select" then
server = require "net.server_select";
@@ -55,12 +55,12 @@ elseif server_type == "select" then
defaults[k] = v;
end
function set_config(settings)
- local select_settings = {};
- for k,default in pairs(defaults) do
- select_settings[k] = settings[k] or default;
+ local select_settings = {};
+ for k,default in pairs(defaults) do
+ select_settings[k] = settings[k] or default;
+ end
+ server.changesettings(select_settings);
end
- server.changesettings(select_settings);
- end
else
error("Unsupported server type")
end
diff --git a/net/server_select.lua b/net/server_select.lua
index c89747bb..37d57d29 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -87,6 +87,7 @@ local _socketlist
local _closelist
local _readtimes
local _writetimes
+local _fullservers
--// simple data types //--
@@ -100,6 +101,7 @@ local _readtraffic
local _selecttimeout
local _tcpbacklog
+local _accepretry
local _starttime
local _currenttime
@@ -126,6 +128,7 @@ _socketlist = { } -- key = socket, value = wrapped socket (handlers)
_readtimes = { } -- key = handler, value = timestamp of last data reading
_writetimes = { } -- key = handler, value = timestamp of last data writing/sending
_closelist = { } -- handlers to close
+_fullservers = { } -- servers in a paused state while there are too many clients
_readlistlen = 0 -- length of readlist
_sendlistlen = 0 -- length of sendlist
@@ -136,6 +139,7 @@ _readtraffic = 0
_selecttimeout = 1 -- timeout of socket.select
_tcpbacklog = 128 -- some kind of hint to the OS
+_accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
_maxsendlen = 51000 * 1024 -- max len of send buffer
_maxreadlen = 25000 * 1024 -- max len of read buffer
@@ -204,6 +208,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
socket = nil;
end
handler.paused = true;
+ out_put("server.lua: server [", ip, "]:", serverport, " paused")
end
end
handler.resume = function( )
@@ -214,7 +219,9 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
end
_readlistlen = addsocket(_readlist, socket, _readlistlen)
_socketlist[ socket ] = handler
+ _fullservers[ handler ] = nil
handler.paused = false;
+ out_put("server.lua: server [", ip, "]:", serverport, " resumed")
end
end
handler.ip = function( )
@@ -229,6 +236,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
handler.readbuffer = function( )
if _readlistlen >= _maxselectlen or _sendlistlen >= _maxselectlen then
handler.pause( )
+ _fullservers[ handler ] = _currenttime
out_put( "server.lua: refused new client connection: server full" )
return false
end
@@ -247,6 +255,8 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
return;
elseif err then -- maybe timeout or something else
out_put( "server.lua: error with new client connection: ", tostring(err) )
+ handler.pause( )
+ _fullservers[ handler ] = _currenttime
return false
end
end
@@ -259,6 +269,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
out_error("server.lua: Disallowed FD number: "..socket:getfd()) -- PROTIP: Switch to libevent
socket:close( ) -- Should we send some kind of error here?
if server then
+ _fullservers[ server ] = _currenttime
server.pause( )
end
return nil, nil, "fd-too-large"
@@ -796,6 +807,7 @@ getsettings = function( )
max_connections = _maxselectlen;
max_ssl_handshake_roundtrips = _maxsslhandshake;
highest_allowed_fd = _maxfd;
+ accept_retry_interval = _accepretry;
}
end
@@ -810,6 +822,7 @@ changesettings = function( new )
_tcpbacklog = tonumber( new.tcp_backlog ) or _tcpbacklog
_sendtimeout = tonumber( new.send_timeout ) or _sendtimeout
_readtimeout = tonumber( new.read_timeout ) or _readtimeout
+ _accepretry = tonumber( new.accept_retry_interval ) or _accepretry
_maxselectlen = new.max_connections or _maxselectlen
_maxsslhandshake = new.max_ssl_handshake_roundtrips or _maxsslhandshake
_maxfd = new.highest_allowed_fd or _maxfd
@@ -884,7 +897,7 @@ loop = function(once) -- this is the main loop of the program
_currenttime = luasocket_gettime( )
repeat
-- Fire timers
- local next_timer_time = math_huge;
+ local next_timer_time = math_huge;
for i = 1, _timerlistlen do
local t = _timerlist[ i ]( _currenttime ) -- fire timers
if t then next_timer_time = math_min(next_timer_time, t); end
@@ -936,6 +949,13 @@ loop = function(once) -- this is the main loop of the program
end
end
end
+
+ for server, paused_time in pairs( _fullservers ) do
+ if _currenttime - paused_time > _accepretry then
+ _fullservers[ server ] = nil;
+ server.resume();
+ end
+ end
until quitting;
if once and quitting == "once" then quitting = nil; return; end
closeall();
diff --git a/plugins/mod_s2s/s2sout.lib.lua b/plugins/mod_s2s/s2sout.lib.lua
index 395406cd..42413164 100644
--- a/plugins/mod_s2s/s2sout.lib.lua
+++ b/plugins/mod_s2s/s2sout.lib.lua
@@ -18,31 +18,13 @@ local socket = require "socket";
local adns = require "net.adns";
local dns = require "net.dns";
local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
+local local_addresses = require "util.net".local_addresses;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local log = module._log;
-local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
-local function get_sources(addrs)
- local sources = {};
- for _, IP in ipairs(addrs) do
- local sock;
- if IP.proto == "IPv4" then
- sock = socket.udp();
- elseif IP.proto == "IPv6" then
- sock = socket.udp6();
- end
- sock:setpeername(IP.addr, 9);
- local localaddr = sock:getsockname() or anysource[IP.proto];
- sock:close();
- if not sources[localaddr] then
- sources[localaddr] = true;
- t_insert(sources, new_ip(localaddr, IP.proto));
- end
- end
- return sources;
-end
+local sources = {};
local has_ipv4, has_ipv6;
local dns_timeout = module:get_option_number("dns_timeout", 15);
@@ -196,7 +178,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
if have_other_result then
if #IPs > 0 then
- rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+ rfc6724_dest(host_session.ip_hosts, sources);
for i = 1, #IPs do
IPs[i] = {ip = IPs[i], port = connect_port};
end
@@ -232,7 +214,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
if have_other_result then
if #IPs > 0 then
- rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+ rfc6724_dest(host_session.ip_hosts, sources);
for i = 1, #IPs do
IPs[i] = {ip = IPs[i], port = connect_port};
end
@@ -320,12 +302,28 @@ module:hook_global("service-added", function (event)
return;
end
for source, _ in pairs(s2s_sources) do
- if source:find(":") then
- has_ipv6 = true;
+ if source == "*" or source == "0.0.0.0" then
+ for _, addr in ipairs(local_addresses("ipv4", true)) do
+ sources[#sources + 1] = new_ip(addr, "IPv4");
+ end
+ elseif source == "::" then
+ for _, addr in ipairs(local_addresses("ipv6", true)) do
+ sources[#sources + 1] = new_ip(addr, "IPv6");
+ end
else
+ sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
+ end
+ end
+ for i = 1,#sources do
+ if sources[i].proto == "IPv6" then
+ has_ipv6 = true;
+ elseif sources[i].proto == "IPv4" then
has_ipv4 = true;
end
end
+ if not (has_ipv4 or has_ipv6) then
+ module:log("warn", "No local IPv4 or IPv6 addresses detected, outgoing connections may fail");
+ end
end);
return s2sout;