aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/multiplex_listener.lua4
-rw-r--r--net/xmppclient_listener.lua90
-rw-r--r--net/xmppserver_listener.lua86
3 files changed, 99 insertions, 81 deletions
diff --git a/net/multiplex_listener.lua b/net/multiplex_listener.lua
index bf193ad8..b515ccce 100644
--- a/net/multiplex_listener.lua
+++ b/net/multiplex_listener.lua
@@ -19,6 +19,8 @@ function server.onincoming(conn, data)
if buf:match("^[a-zA-Z]") then
local listener = httpserver_listener;
conn:setlistener(listener);
+ local onconnect = listener.onconnect;
+ if onconnect then onconnect(conn) end
listener.onincoming(conn, buf);
elseif buf:match(">") then
local listener;
@@ -31,6 +33,8 @@ function server.onincoming(conn, data)
listener = xmppclient_listener;
end
conn:setlistener(listener);
+ local onconnect = listener.onconnect;
+ if onconnect then onconnect(conn) end
listener.onincoming(conn, buf);
elseif #buf > 1024 then
conn:close();
diff --git a/net/xmppclient_listener.lua b/net/xmppclient_listener.lua
index 94daa2b2..623a98c8 100644
--- a/net/xmppclient_listener.lua
+++ b/net/xmppclient_listener.lua
@@ -11,7 +11,7 @@
local logger = require "logger";
local log = logger.init("xmppclient_listener");
local lxp = require "lxp"
-local init_xmlhandlers = require "core.xmlhandlers"
+local new_xmpp_stream = require "util.xmppstream".new;
local sm_new_session = require "core.sessionmanager".new_session;
local connlisteners_register = require "net.connlisteners".register;
@@ -72,23 +72,6 @@ local xmppclient = { default_port = 5222, default_mode = "*a" };
-- These are session methods --
-local function session_reset_stream(session)
- -- Reset stream
- local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
- session.parser = parser;
-
- session.notopen = true;
-
- function session.data(conn, data)
- local ok, err = parser:parse(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("xml-not-well-formed");
- end
-
- return true;
-end
-
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
local function session_close(session, reason)
@@ -128,32 +111,57 @@ end
-- End of session methods --
-function xmppclient.onincoming(conn, data)
- local session = sessions[conn];
- if not session then
- session = sm_new_session(conn);
- sessions[conn] = session;
-
- session.log("info", "Client connected");
-
- -- Client is using legacy SSL (otherwise mod_tls sets this flag)
- if conn:ssl() then
- session.secure = true;
+function xmppclient.onconnect(conn)
+ local session = sm_new_session(conn);
+ sessions[conn] = session;
+
+ session.log("info", "Client connected");
+
+ -- Client is using legacy SSL (otherwise mod_tls sets this flag)
+ if conn:ssl() then
+ session.secure = true;
+ end
+
+ if opt_keepalives ~= nil then
+ conn:setoption("keepalive", opt_keepalives);
+ end
+
+ session.close = session_close;
+
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
+ end
+
+ local filter = session.filter;
+ function session.data(data)
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("xml-not-well-formed");
end
-
- if opt_keepalives ~= nil then
- conn:setoption("keepalive", opt_keepalives);
+ end
+
+ local handlestanza = stream_callbacks.handlestanza;
+ function session.dispatch_stanza(session, stanza)
+ stanza = filter("stanzas/in", stanza);
+ if stanza then
+ return handlestanza(session, stanza);
end
-
- session.reset_stream = session_reset_stream;
- session.close = session_close;
-
- session_reset_stream(session); -- Initialise, ready for use
-
- session.dispatch_stanza = stream_callbacks.handlestanza;
end
- if data then
- session.data(conn, data);
+end
+
+function xmppclient.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
end
end
diff --git a/net/xmppserver_listener.lua b/net/xmppserver_listener.lua
index d1272edb..8700e6f2 100644
--- a/net/xmppserver_listener.lua
+++ b/net/xmppserver_listener.lua
@@ -11,7 +11,7 @@
local logger = require "logger";
local log = logger.init("xmppserver_listener");
local lxp = require "lxp"
-local init_xmlhandlers = require "core.xmlhandlers"
+local new_xmpp_stream = require "util.xmppstream".new;
local s2s_new_incoming = require "core.s2smanager".new_incoming;
local s2s_streamopened = require "core.s2smanager".streamopened;
local s2s_streamclosed = require "core.s2smanager".streamclosed;
@@ -72,24 +72,6 @@ local xmppserver = { default_port = 5269, default_mode = "*a" };
-- These are session methods --
-local function session_reset_stream(session)
- -- Reset stream
- local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
- session.parser = parser;
-
- session.notopen = true;
-
- function session.data(conn, data)
- local ok, err = parser:parse(data);
- if ok then return; end
- (session.log or log)("warn", "Received invalid XML: %s", data);
- (session.log or log)("warn", "Problem was: %s", err);
- session:close("xml-not-well-formed");
- end
-
- return true;
-end
-
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
local function session_close(session, reason, remote_reason)
@@ -132,29 +114,58 @@ end
-- End of session methods --
-function xmppserver.onincoming(conn, data)
- local session = sessions[conn];
- if not session then
- session = s2s_new_incoming(conn);
- sessions[conn] = session;
+local function initialize_session(session)
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
+ end
+
+ local filter = session.filter;
+ function session.data(data)
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ (session.log or log)("warn", "Received invalid XML: %s", data);
+ (session.log or log)("warn", "Problem was: %s", err);
+ session:close("xml-not-well-formed");
+ end
+ end
- -- Logging functions --
+ session.close = session_close;
+ local handlestanza = stream_callbacks.handlestanza;
+ function session.dispatch_stanza(session, stanza)
+ stanza = filters("stanzas/in", stanza);
+ if stanza then
+ return handlestanza(session, stanza);
+ end
+ end
+end
-
+function xmppserver.onconnect(conn)
+ if not sessions[conn] then -- May be an existing outgoing session
+ local session = s2s_new_incoming(conn);
+ sessions[conn] = session;
+
+ -- Logging functions --
local conn_name = "s2sin"..tostring(conn):match("[a-f0-9]+$");
session.log = logger.init(conn_name);
session.log("info", "Incoming s2s connection");
- session.reset_stream = session_reset_stream;
- session.close = session_close;
-
- session_reset_stream(session); -- Initialise, ready for use
-
- session.dispatch_stanza = stream_callbacks.handlestanza;
+ initialize_session(session);
end
- if data then
- session.data(conn, data);
+end
+
+function xmppserver.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
end
end
@@ -190,12 +201,7 @@ function xmppserver.register_outgoing(conn, session)
session.direction = "outgoing";
sessions[conn] = session;
- session.reset_stream = session_reset_stream;
- session.close = session_close;
- session_reset_stream(session); -- Initialise, ready for use
-
- --local function handleerr(err) print("Traceback:", err, debug.traceback()); end
- --session.stanza_dispatch = function (stanza) return select(2, xpcall(function () return core_process_stanza(session, stanza); end, handleerr)); end
+ initialize_session(session);
end
connlisteners_register("xmppserver", xmppserver);