aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/adns.lua7
-rw-r--r--net/dns.lua44
-rw-r--r--net/multiplex_listener.lua4
-rw-r--r--net/server_event.lua37
-rw-r--r--net/server_select.lua32
-rw-r--r--net/xmppclient_listener.lua96
-rw-r--r--net/xmppcomponent_listener.lua26
-rw-r--r--net/xmppserver_listener.lua94
8 files changed, 210 insertions, 130 deletions
diff --git a/net/adns.lua b/net/adns.lua
index 88d4b4b3..d0151c8c 100644
--- a/net/adns.lua
+++ b/net/adns.lua
@@ -36,12 +36,9 @@ function lookup(handler, qname, qtype, qclass)
end)(dns.peek(qname, qtype, qclass));
end
-function cancel(handle, call_handler)
+function cancel(handle, call_handler, reason)
log("warn", "Cancelling DNS lookup for %s", tostring(handle[3]));
- dns.cancel(handle);
- if call_handler then
- coroutine.resume(handle[4]);
- end
+ dns.cancel(handle[1], handle[2], handle[3], handle[4], call_handler);
end
function new_async_socket(sock, resolver)
diff --git a/net/dns.lua b/net/dns.lua
index c0de97fd..fcc679e3 100644
--- a/net/dns.lua
+++ b/net/dns.lua
@@ -16,6 +16,8 @@
local socket = require "socket";
local ztact = require "util.ztact";
+local timer = require "util.timer";
+
local _, windows = pcall(require, "util.windows");
local is_windows = (_ and windows) or os.getenv("WINDIR");
@@ -27,6 +29,7 @@ local ipairs, next, pairs, print, setmetatable, tostring, assert, error, unpack
local get, set = ztact.get, ztact.set;
+local default_timeout = 15;
-------------------------------------------------- module dns
module('dns')
@@ -115,6 +118,7 @@ end
local resolver = {};
resolver.__index = resolver;
+resolver.timeout = default_timeout;
local SRV_tostring;
@@ -678,7 +682,28 @@ function resolver:query(qname, qtype, qclass) -- - - - - - - - - - -- query
--set(self.yielded, co, qclass, qtype, qname, true);
end
- self:getsocket (o.server):send (o.packet)
+ local conn = self:getsocket(o.server)
+ conn:send (o.packet)
+
+ if timer and self.timeout then
+ local num_servers = #self.server;
+ local i = 1;
+ timer.add_task(self.timeout, function ()
+ if get(self.wanted, qclass, qtype, qname, co) then
+ if i < num_servers then
+ i = i + 1;
+ self:servfail(conn);
+ o.server = self.best_server;
+ conn = self:getsocket(o.server);
+ conn:send(o.packet);
+ return self.timeout;
+ else
+ -- Tried everything, failed
+ self:cancel(qclass, qtype, qname, co, true);
+ end
+ end
+ end)
+ end
end
function resolver:servfail(sock)
@@ -720,6 +745,10 @@ function resolver:servfail(sock)
end
end
+function resolver:settimeout(seconds)
+ self.timeout = seconds;
+end
+
function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
--print('receive'); print(self.socket);
self.time = socket.gettime();
@@ -806,10 +835,13 @@ function resolver:feed(sock, packet)
return response;
end
-function resolver:cancel(data)
- local cos = get(self.wanted, unpack(data, 1, 3));
+function resolver:cancel(qclass, qtype, qname, co, call_handler)
+ local cos = get(self.wanted, qclass, qtype, qname);
if cos then
- cos[data[4]] = nil;
+ if call_handler then
+ coroutine.resume(co);
+ end
+ cos[co] = nil;
end
end
@@ -961,6 +993,10 @@ function dns.cancel(...) -- - - - - - - - - - - - - - - - - - - - - - cancel
return _resolver:cancel(...);
end
+function dns.settimeout(...)
+ return _resolver:settimeout(...);
+end
+
function dns.socket_wrapper_set(...) -- - - - - - - - - socket_wrapper_set
return _resolver:socket_wrapper_set(...);
end
diff --git a/net/multiplex_listener.lua b/net/multiplex_listener.lua
index bf193ad8..b515ccce 100644
--- a/net/multiplex_listener.lua
+++ b/net/multiplex_listener.lua
@@ -19,6 +19,8 @@ function server.onincoming(conn, data)
if buf:match("^[a-zA-Z]") then
local listener = httpserver_listener;
conn:setlistener(listener);
+ local onconnect = listener.onconnect;
+ if onconnect then onconnect(conn) end
listener.onincoming(conn, buf);
elseif buf:match(">") then
local listener;
@@ -31,6 +33,8 @@ function server.onincoming(conn, data)
listener = xmppclient_listener;
end
conn:setlistener(listener);
+ local onconnect = listener.onconnect;
+ if onconnect then onconnect(conn) end
listener.onincoming(conn, buf);
elseif #buf > 1024 then
conn:close();
diff --git a/net/server_event.lua b/net/server_event.lua
index 0331e793..d2d40374 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -143,9 +143,9 @@ do
debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
else
if plainssl and ssl then -- start ssl session
- self:starttls()
+ self:starttls(nil, true)
else -- normal connection
- self:_start_session( self.listener.onconnect )
+ self:_start_session(true)
end
debug( "new connection established. id:", self.id )
end
@@ -155,13 +155,18 @@ do
self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
return true
end
- function interface_mt:_start_session(onconnect) -- new session, for example after startssl
+ function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
if self.type == "client" then
local callback = function( )
self:_lock( false, false, false )
--vdebug( "start listening on client socket with id:", self.id )
self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
- self:onconnect()
+ if call_onconnect then
+ debug("CALLING ONCONNECT")
+ self:onconnect()
+ else
+ debug("NOT CALLING ONCONNECT");
+ end
self.eventsession = nil
return -1
end
@@ -173,7 +178,7 @@ do
end
return true
end
- function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first
+ function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
--vdebug( "starting ssl session with client id:", self.id )
local _
_ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
@@ -184,7 +189,7 @@ do
if err then
self.fatalerror = err
self.conn = nil -- cannot be used anymore
- if "onconnect" == arg then
+ if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
@@ -211,14 +216,11 @@ do
self.send = self.conn.send -- caching table lookups with new client object
self.receive = self.conn.receive
local onsomething
- if "onconnect" == arg then -- trigger listener
- onsomething = self.onconnect
- else
- onsomething = self.onsslconnection
+ if not call_onconnect then -- trigger listener
+ self:onstatus("ssl-handshake-complete");
end
- self:_start_session( onsomething )
+ self:_start_session( call_onconnect )
debug( "ssl handshake done" )
- self:onstatus("ssl-handshake-complete");
self.eventhandshake = nil
return -1
end
@@ -232,7 +234,7 @@ do
end
end
if self.fatalerror then
- if "onconnect" == arg then
+ if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
@@ -414,7 +416,7 @@ do
-- No-op, we always use the underlying connection's send
end
- function interface_mt:starttls(sslctx)
+ function interface_mt:starttls(sslctx, call_onconnect)
debug( "try to start ssl at client id:", self.id )
local err
self._sslctx = sslctx;
@@ -428,7 +430,7 @@ do
self._usingssl = true
self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
self.startsslcallback = nil
- self:_start_ssl();
+ self:_start_ssl(call_onconnect);
self.eventstarthandshake = nil
return -1
end
@@ -468,7 +470,6 @@ do
function interface_mt:ondrain()
end
function interface_mt:onstatus()
- debug("server.lua: Dummy onstatus()")
end
end
@@ -700,9 +701,9 @@ do
local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx )
--vdebug( "client id:", clientinterface, "startssl:", startssl )
if ssl and sslctx then
- clientinterface:starttls(sslctx)
+ clientinterface:starttls(sslctx, true)
else
- clientinterface:_start_session( clientinterface.onconnect )
+ clientinterface:_start_session( true )
end
debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
diff --git a/net/server_select.lua b/net/server_select.lua
index 298e560a..51ae4e66 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -167,7 +167,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxco
local connections = 0
- local dispatch, disconnect = listeners.onincoming, listeners.ondisconnect
+ local dispatch, disconnect = listeners.onconnect or listeners.onincoming, listeners.ondisconnect
local accept = socket.accept
@@ -483,7 +483,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
if drain then
drain(handler)
end
- _ = needtls and handler:starttls(nil, true)
+ _ = needtls and handler:starttls(nil)
_ = toclose and handler:close( )
return true
elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write
@@ -564,13 +564,13 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
end
else
local sslctx;
- handler.starttls = function( self, _sslctx, now )
+ handler.starttls = function( self, _sslctx)
if _sslctx then
sslctx = _sslctx;
handler:set_sslctx(sslctx);
end
- if not now then
- out_put "server.lua: we need to do tls, but delaying until later"
+ if bufferqueuelen > 0 then
+ out_put "server.lua: we need to do tls, but delaying until send buffer empty"
needtls = true
return
end
@@ -623,16 +623,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
- if listeners.onconnect then
- _sendlistlen = addsocket(_sendlist, socket, _sendlistlen)
- handler.sendbuffer = function ()
- listeners.onconnect(handler);
- handler.sendbuffer = _sendbuffer;
- if bufferqueuelen > 0 then
- return _sendbuffer();
- end
- end
- end
return handler, socket
end
@@ -854,6 +844,18 @@ local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx
local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx )
_socketlist[ socket ] = handler
_sendlistlen = addsocket(_sendlist, socket, _sendlistlen)
+ if listeners.onconnect then
+ -- When socket is writeable, call onconnect
+ local _sendbuffer = handler.sendbuffer;
+ handler.sendbuffer = function ()
+ listeners.onconnect(handler);
+ handler.sendbuffer = _sendbuffer;
+ -- If there was data with the incoming packet, handle it now.
+ if #handler:bufferqueue() > 0 then
+ return _sendbuffer();
+ end
+ end
+ end
return handler, socket
end
diff --git a/net/xmppclient_listener.lua b/net/xmppclient_listener.lua
index 94daa2b2..75726972 100644
--- a/net/xmppclient_listener.lua
+++ b/net/xmppclient_listener.lua
@@ -11,7 +11,7 @@
local logger = require "logger";
local log = logger.init("xmppclient_listener");
local lxp = require "lxp"
-local init_xmlhandlers = require "core.xmlhandlers"
+local new_xmpp_stream = require "util.xmppstream".new;
local sm_new_session = require "core.sessionmanager".new_session;
local connlisteners_register = require "net.connlisteners".register;
@@ -63,8 +63,11 @@ function stream_callbacks.error(session, error, data)
end
local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), debug.traceback()); end
-function stream_callbacks.handlestanza(a, b)
- xpcall(function () core_process_stanza(a, b) end, handleerr);
+function stream_callbacks.handlestanza(session, stanza)
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
+ end
end
local sessions = {};
@@ -72,23 +75,6 @@ local xmppclient = { default_port = 5222, default_mode = "*a" };
-- These are session methods --
-local function session_reset_stream(session)
- -- Reset stream
- local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
- session.parser = parser;
-
- session.notopen = true;
-
- function session.data(conn, data)
- local ok, err = parser:parse(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("xml-not-well-formed");
- end
-
- return true;
-end
-
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
local function session_close(session, reason)
@@ -128,32 +114,54 @@ end
-- End of session methods --
-function xmppclient.onincoming(conn, data)
- local session = sessions[conn];
- if not session then
- session = sm_new_session(conn);
- sessions[conn] = session;
-
- session.log("info", "Client connected");
-
- -- Client is using legacy SSL (otherwise mod_tls sets this flag)
- if conn:ssl() then
- session.secure = true;
- end
-
- if opt_keepalives ~= nil then
- conn:setoption("keepalive", opt_keepalives);
+function xmppclient.onconnect(conn)
+ local session = sm_new_session(conn);
+ sessions[conn] = session;
+
+ session.log("info", "Client connected");
+
+ -- Client is using legacy SSL (otherwise mod_tls sets this flag)
+ if conn:ssl() then
+ session.secure = true;
+ end
+
+ if opt_keepalives ~= nil then
+ conn:setoption("keepalive", opt_keepalives);
+ end
+
+ session.close = session_close;
+
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
+ end
+
+ local filter = session.filter;
+ function session.data(data)
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("xml-not-well-formed");
end
-
- session.reset_stream = session_reset_stream;
- session.close = session_close;
-
- session_reset_stream(session); -- Initialise, ready for use
-
- session.dispatch_stanza = stream_callbacks.handlestanza;
end
- if data then
- session.data(conn, data);
+
+ local handlestanza = stream_callbacks.handlestanza;
+ function session.dispatch_stanza(session, stanza)
+ return handlestanza(session, stanza);
+ end
+end
+
+function xmppclient.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
end
end
diff --git a/net/xmppcomponent_listener.lua b/net/xmppcomponent_listener.lua
index b87f7c96..5532186b 100644
--- a/net/xmppcomponent_listener.lua
+++ b/net/xmppcomponent_listener.lua
@@ -18,6 +18,7 @@ local connlisteners = require "net.connlisteners";
local cm_register_component = require "core.componentmanager".register_component;
local cm_deregister_component = require "core.componentmanager".deregister_component;
local uuid_gen = require "util.uuid".generate;
+local jid_split = require "util.jid".split;
local sha1 = require "util.hashes".sha1;
local st = require "util.stanza";
local init_xmlhandlers = require "core.xmlhandlers";
@@ -99,6 +100,31 @@ function stream_callbacks.handlestanza(session, stanza)
if not stanza.attr.xmlns and stanza.name == "handshake" then
stanza.attr.xmlns = xmlns_component;
end
+ if not stanza.attr.xmlns or stanza.attr.xmlns == "jabber:client" then
+ local from = stanza.attr.from;
+ if from then
+ if session.component_validate_from then
+ local _, domain = jid_split(stanza.attr.from);
+ if domain ~= session.host then
+ -- Return error
+ session.log("warn", "Component sent stanza with missing or invalid 'from' address");
+ session:close{
+ condition = "invalid-from";
+ text = "Component tried to send from address <"..tostring(from)
+ .."> which is not in domain <"..tostring(session.host)..">";
+ };
+ return;
+ end
+ end
+ else
+ stanza.attr.from = session.host;
+ end
+ if not stanza.attr.to then
+ session.log("warn", "Rejecting stanza with no 'to' address");
+ session.send(st.error_reply(stanza, "modify", "bad-request", "Components MUST specify a 'to' address on stanzas"));
+ return;
+ end
+ end
return core_process_stanza(session, stanza);
end
diff --git a/net/xmppserver_listener.lua b/net/xmppserver_listener.lua
index d1272edb..05e14a0f 100644
--- a/net/xmppserver_listener.lua
+++ b/net/xmppserver_listener.lua
@@ -11,7 +11,7 @@
local logger = require "logger";
local log = logger.init("xmppserver_listener");
local lxp = require "lxp"
-local init_xmlhandlers = require "core.xmlhandlers"
+local new_xmpp_stream = require "util.xmppstream".new;
local s2s_new_incoming = require "core.s2smanager".new_incoming;
local s2s_streamopened = require "core.s2smanager".streamopened;
local s2s_streamclosed = require "core.s2smanager".streamclosed;
@@ -49,11 +49,14 @@ function stream_callbacks.error(session, error, data)
end
local function handleerr(err) log("error", "Traceback[s2s]: %s: %s", tostring(err), debug.traceback()); end
-function stream_callbacks.handlestanza(a, b)
- if b.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client
- b.attr.xmlns = nil;
+function stream_callbacks.handlestanza(session, stanza)
+ if stanza.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client
+ stanza.attr.xmlns = nil;
+ end
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
end
- xpcall(function () core_process_stanza(a, b) end, handleerr);
end
local connlisteners_register = require "net.connlisteners".register;
@@ -72,24 +75,6 @@ local xmppserver = { default_port = 5269, default_mode = "*a" };
-- These are session methods --
-local function session_reset_stream(session)
- -- Reset stream
- local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
- session.parser = parser;
-
- session.notopen = true;
-
- function session.data(conn, data)
- local ok, err = parser:parse(data);
- if ok then return; end
- (session.log or log)("warn", "Received invalid XML: %s", data);
- (session.log or log)("warn", "Problem was: %s", err);
- session:close("xml-not-well-formed");
- end
-
- return true;
-end
-
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
local function session_close(session, reason, remote_reason)
@@ -132,29 +117,55 @@ end
-- End of session methods --
-function xmppserver.onincoming(conn, data)
- local session = sessions[conn];
- if not session then
- session = s2s_new_incoming(conn);
- sessions[conn] = session;
+local function initialize_session(session)
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
+ end
+
+ local filter = session.filter;
+ function session.data(data)
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ (session.log or log)("warn", "Received invalid XML: %s", data);
+ (session.log or log)("warn", "Problem was: %s", err);
+ session:close("xml-not-well-formed");
+ end
+ end
- -- Logging functions --
+ session.close = session_close;
+ local handlestanza = stream_callbacks.handlestanza;
+ function session.dispatch_stanza(session, stanza)
+ return handlestanza(session, stanza);
+ end
+end
-
+function xmppserver.onconnect(conn)
+ if not sessions[conn] then -- May be an existing outgoing session
+ local session = s2s_new_incoming(conn);
+ sessions[conn] = session;
+
+ -- Logging functions --
local conn_name = "s2sin"..tostring(conn):match("[a-f0-9]+$");
session.log = logger.init(conn_name);
session.log("info", "Incoming s2s connection");
- session.reset_stream = session_reset_stream;
- session.close = session_close;
-
- session_reset_stream(session); -- Initialise, ready for use
-
- session.dispatch_stanza = stream_callbacks.handlestanza;
+ initialize_session(session);
end
- if data then
- session.data(conn, data);
+end
+
+function xmppserver.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
end
end
@@ -190,12 +201,7 @@ function xmppserver.register_outgoing(conn, session)
session.direction = "outgoing";
sessions[conn] = session;
- session.reset_stream = session_reset_stream;
- session.close = session_close;
- session_reset_stream(session); -- Initialise, ready for use
-
- --local function handleerr(err) print("Traceback:", err, debug.traceback()); end
- --session.stanza_dispatch = function (stanza) return select(2, xpcall(function () return core_process_stanza(session, stanza); end, handleerr)); end
+ initialize_session(session);
end
connlisteners_register("xmppserver", xmppserver);