aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_bosh.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r--plugins/mod_bosh.lua224
1 files changed, 105 insertions, 119 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 8cda4a23..251ef740 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -6,9 +6,8 @@
-- COPYING file in the source package for more information.
--
-module:set_global(); -- Global module
+module:set_global();
-local hosts = _G.hosts;
local new_xmpp_stream = require "util.xmppstream".new;
local sm = require "core.sessionmanager";
local sm_destroy_session = sm.destroy_session;
@@ -16,12 +15,14 @@ local new_uuid = require "util.uuid".generate;
local core_process_stanza = prosody.core_process_stanza;
local st = require "util.stanza";
local logger = require "util.logger";
-local log = logger.init("mod_bosh");
+local log = module._log;
local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
-local xpcall, tostring, type = xpcall, tostring, type;
+local tostring, type = tostring, type;
local traceback = debug.traceback;
+local runner = require"util.async".runner;
local nameprep = require "util.encodings".stringprep.nameprep;
+local cache = require "util.cache";
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -48,33 +49,14 @@ local cross_domain = module:get_option("cross_domain_bosh", false);
if cross_domain == true then cross_domain = "*"; end
if type(cross_domain) == "table" then cross_domain = table.concat(cross_domain, ", "); end
-local trusted_proxies = module:get_option_set("trusted_proxies", { "127.0.0.1", "::1" })._items;
-
-local function get_ip_from_request(request)
- local ip = request.conn:ip();
- local forwarded_for = request.headers.x_forwarded_for;
- if forwarded_for then
- forwarded_for = forwarded_for..", "..ip;
- for forwarded_ip in forwarded_for:gmatch("[^%s,]+") do
- if not trusted_proxies[forwarded_ip] then
- ip = forwarded_ip;
- end
- end
- end
- return ip;
-end
-
local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
-local os_time = os.time;
-- All sessions, and sessions that have no requests open
-local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
+local sessions = module:shared("sessions");
-- Used to respond to idle sessions (those with waiting requests)
-local waiting_requests = module:shared("waiting_requests");
function on_destroy_request(request)
log("debug", "Request destroyed: %s", tostring(request));
- waiting_requests[request] = nil;
local session = sessions[request.context.sid];
if session then
local requests = session.requests;
@@ -88,9 +70,24 @@ function on_destroy_request(request)
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
- inactive_sessions[session] = os_time() + max_inactive;
+ if session.inactive_timer then
+ session.inactive_timer:stop();
+ end
+ session.inactive_timer = module:add_timer(max_inactive, check_inactive, session, request.context,
+ "BOSH client silent for over "..max_inactive.." seconds");
(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
end
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
+ end
+ end
+end
+
+function check_inactive(now, session, context, reason) -- luacheck: ignore 212/now
+ if not session.destroyed then
+ sessions[context.sid] = nil;
+ sm_destroy_session(session, reason);
end
end
@@ -124,7 +121,7 @@ function handle_POST(event)
local headers = response.headers;
headers.content_type = "text/xml; charset=utf-8";
- if cross_domain and event.request.headers.origin then
+ if cross_domain and request.headers.origin then
set_cross_domain_headers(response);
end
@@ -148,8 +145,14 @@ function handle_POST(event)
if session then
-- Session was marked as inactive, since we have
-- a request open now, unmark it
- if inactive_sessions[session] and #session.requests > 0 then
- inactive_sessions[session] = nil;
+ if session.inactive_timer and #session.requests > 0 then
+ session.inactive_timer:stop();
+ session.inactive_timer = nil;
+ end
+
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
end
local r = session.requests;
@@ -177,9 +180,6 @@ function handle_POST(event)
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
- if session.bosh_wait then
- waiting_requests[response] = os_time() + session.bosh_wait;
- end
end
if session.bosh_terminate then
@@ -187,10 +187,22 @@ function handle_POST(event)
session:close();
return nil;
else
+ if session.bosh_wait and #session.requests > 0 then
+ session.bosh_wait_timer = module:add_timer(session.bosh_wait, after_bosh_wait, session.requests[1], session)
+ end
+
return true; -- Inform http server we shall reply later
end
- elseif response.finished then
- return; -- A response has been sent already
+ elseif response.finished or context.ignore_request then
+ if response.finished then
+ module:log("debug", "Response finished");
+ end
+ if context.ignore_request then
+ module:log("debug", "Ignoring this request");
+ end
+ -- A response has been sent already, or we're ignoring this request
+ -- (e.g. so a different instance of the module can handle it)
+ return;
end
module:log("warn", "Unable to associate request with a session (incomplete request?)");
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
@@ -198,13 +210,17 @@ function handle_POST(event)
return tostring(close_reply) .. "\n";
end
+function after_bosh_wait(now, request, session) -- luacheck: ignore 212
+ if request.conn then
+ session.send("");
+ end
+end
local function bosh_reset_stream(session) session.notopen = true; end
local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
-
local function bosh_close_stream(session, reason)
- (session.log or log)("info", "BOSH client disconnected");
+ (session.log or log)("info", "BOSH client disconnected: %s", tostring((reason and reason.condition or reason) or "session close"));
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams });
@@ -237,21 +253,22 @@ local function bosh_close_stream(session, reason)
held_request:send(response_body);
end
sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
sm_destroy_session(session);
end
+local runner_callbacks = { };
+
-- Handle the <body> tag in the request payload.
function stream_callbacks.streamopened(context, attr)
local request, response = context.request, context.response;
- local sid = attr.sid;
+ local sid, rid = attr.sid, tonumber(attr.rid);
log("debug", "BOSH body open (sid: %s)", sid or "<none>");
+ context.rid = rid;
if not sid then
-- New session request
context.notopen = nil; -- Signals that we accept this opening tag
local to_host = nameprep(attr.to);
- local rid = tonumber(attr.rid);
local wait = tonumber(attr.wait);
if not to_host then
log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
@@ -259,13 +276,6 @@ function stream_callbacks.streamopened(context, attr)
["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
response:send(tostring(close_reply));
return;
- elseif not hosts[to_host] then
- -- Unknown host
- log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
- local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
- ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
- response:send(tostring(close_reply));
- return;
end
if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then
log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait));
@@ -275,28 +285,32 @@ function stream_callbacks.streamopened(context, attr)
return;
end
- rid = rid - 1;
wait = math_min(wait, bosh_max_wait);
-- New session
sid = new_uuid();
local session = {
- type = "c2s_unauthed", conn = request.conn, sid = sid, rid = rid, host = to_host,
+ type = "c2s_unauthed", conn = request.conn, sid = sid, host = attr.to,
+ rid = rid - 1, -- Hack for initial session setup, "previous" rid was $current_request - 1
bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
- bosh_max_inactive = bosh_max_inactivity,
+ bosh_max_inactive = bosh_max_inactivity, bosh_responses = cache.new(BOSH_HOLD+1):table();
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
- ip = get_ip_from_request(request);
+ ip = request.ip;
};
sessions[sid] = session;
+ session.thread = runner(function (stanza)
+ session:dispatch_stanza(stanza);
+ end, runner_callbacks, session);
+
local filter = initialize_filters(session);
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
- hosts[session.host].events.fire_event("bosh-session", { session = session, request = request });
+ module:fire_event("bosh-session", { session = session, request = request });
-- Send creation response
local creating_session = true;
@@ -335,8 +349,9 @@ function stream_callbacks.streamopened(context, attr)
body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
body_attr["xmpp:version"] = "1.0";
end
- session.bosh_last_response = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
- oldest_request:send(session.bosh_last_response);
+ local response_xml = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
+ session.bosh_responses[oldest_request.context.rid] = response_xml;
+ oldest_request:send(response_xml);
session.send_buffer = {};
end
return true;
@@ -356,24 +371,31 @@ function stream_callbacks.streamopened(context, attr)
session.conn = request.conn;
if session.rid then
- local rid = tonumber(attr.rid);
local diff = rid - session.rid;
-- Diff should be 1 for a healthy request
+ session.log("debug", "rid: %d, sess: %s, diff: %d", rid, session.rid, diff)
if diff ~= 1 then
context.sid = sid;
context.notopen = nil;
- if diff == 2 then
+ if diff == 2 then -- Missed a request
-- Hold request, but don't process it (ouch!)
session.log("debug", "rid skipped: %d, deferring this request", rid-1)
context.defer = true;
session.bosh_deferred = { context = context, sid = sid, rid = rid, terminate = attr.type == "terminate" };
return;
end
+ -- Set a marker to indicate that stanzas in this request should NOT be processed
+ -- (these stanzas will already be in the XML parser's buffer)
context.ignore = true;
- if diff == 0 then
- -- Re-send previous response, ignore stanzas in this request
- session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
- response:send(session.bosh_last_response);
+ if session.bosh_responses[rid] then
+ -- Re-send past response, ignore stanzas in this request
+ session.log("debug", "rid repeated within window, replaying old response");
+ response:send(session.bosh_responses[rid]);
+ return;
+ elseif diff == 0 then
+ session.log("debug", "current rid repeated, ignoring stanzas");
+ t_insert(session.requests, response);
+ context.sid = sid;
return;
end
-- Session broken, destroy it
@@ -397,13 +419,18 @@ function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
- hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+ module:context(session.host):fire_event("stream-features", { origin = session, features = features });
session.send(features);
session.notopen = nil;
end
end
local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
+
+function runner_callbacks:error(err) -- luacheck: ignore 212/self
+ return handleerr(err);
+end
+
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
@@ -417,9 +444,7 @@ function stream_callbacks.handlestanza(context, stanza)
t_insert(session.bosh_deferred, stanza);
else
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ session.thread:run(stanza);
end
else
log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
@@ -432,13 +457,13 @@ function stream_callbacks.streamclosed(context)
if not context.defer and session.bosh_deferred then
-- Handle deferred stanzas now
local deferred_stanzas = session.bosh_deferred;
- local context = deferred_stanzas.context;
+ local deferred_context = deferred_stanzas.context;
session.bosh_deferred = nil;
log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
session.rid = deferred_stanzas.rid;
- t_insert(session.requests, context.response);
+ t_insert(session.requests, deferred_context.response);
for _, stanza in ipairs(deferred_stanzas) do
- stream_callbacks.handlestanza(context, stanza);
+ stream_callbacks.handlestanza(deferred_context, stanza);
end
if deferred_stanzas.terminate then
session.bosh_terminate = true;
@@ -452,8 +477,8 @@ function stream_callbacks.streamclosed(context)
end
function stream_callbacks.error(context, error)
- log("debug", "Error parsing BOSH request payload; %s", error);
if not context.sid then
+ log("debug", "Error parsing BOSH request payload; %s", error);
local response = context.response;
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
@@ -462,6 +487,7 @@ function stream_callbacks.error(context, error)
end
local session = sessions[context.sid];
+ (session and session.log or log)("warn", "Error parsing BOSH request payload; %s", error);
if error == "stream-error" then -- Remote stream error, we close normally
session:close();
else
@@ -469,65 +495,25 @@ function stream_callbacks.error(context, error)
end
end
-local dead_sessions = module:shared("dead_sessions");
-function on_timer()
- -- log("debug", "Checking for requests soon to timeout...");
- -- Identify requests timing out within the next few seconds
- local now = os_time() + 3;
- for request, reply_before in pairs(waiting_requests) do
- if reply_before <= now then
- log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now);
- -- Send empty response to let the
- -- client know we're still here
- if request.conn then
- sessions[request.context.sid].send("");
- end
- end
- end
-
- now = now - 3;
- local n_dead_sessions = 0;
- for session, close_after in pairs(inactive_sessions) do
- if close_after < now then
- (session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now);
- sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
- n_dead_sessions = n_dead_sessions + 1;
- dead_sessions[n_dead_sessions] = session;
- end
- end
-
- for i=1,n_dead_sessions do
- local session = dead_sessions[i];
- dead_sessions[i] = nil;
- sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
- end
- return 1;
-end
-module:add_timer(1, on_timer);
-
-
local GET_response = {
headers = {
content_type = "text/html";
};
body = [[<html><body>
<p>It works! Now point your BOSH client to this URL to connect to Prosody.</p>
- <p>For more information see <a href="http://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
+ <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
</body></html>]];
};
-function module.add_host(module)
- module:depends("http");
- module:provides("http", {
- default_path = "/http-bind";
- route = {
- ["GET"] = GET_response;
- ["GET /"] = GET_response;
- ["OPTIONS"] = handle_OPTIONS;
- ["OPTIONS /"] = handle_OPTIONS;
- ["POST"] = handle_POST;
- ["POST /"] = handle_POST;
- };
- });
-end
+module:depends("http");
+module:provides("http", {
+ default_path = "/http-bind";
+ route = {
+ ["GET"] = GET_response;
+ ["GET /"] = GET_response;
+ ["OPTIONS"] = handle_OPTIONS;
+ ["OPTIONS /"] = handle_OPTIONS;
+ ["POST"] = handle_POST;
+ ["POST /"] = handle_POST;
+ };
+});