aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_bosh.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r--plugins/mod_bosh.lua175
1 files changed, 73 insertions, 102 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 9ef4a41e..880a8235 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -6,9 +6,6 @@
-- COPYING file in the source package for more information.
--
-module:set_global(); -- Global module
-
-local hosts = _G.hosts;
local new_xmpp_stream = require "util.xmppstream".new;
local sm = require "core.sessionmanager";
local sm_destroy_session = sm.destroy_session;
@@ -16,11 +13,12 @@ local new_uuid = require "util.uuid".generate;
local core_process_stanza = prosody.core_process_stanza;
local st = require "util.stanza";
local logger = require "util.logger";
-local log = logger.init("mod_bosh");
+local log = module._log;
local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
-local xpcall, tostring, type = xpcall, tostring, type;
+local tostring, type = tostring, type;
local traceback = debug.traceback;
+local runner = require"util.async".runner;
local nameprep = require "util.encodings".stringprep.nameprep;
local xmlns_streams = "http://etherx.jabber.org/streams";
@@ -48,33 +46,14 @@ local cross_domain = module:get_option("cross_domain_bosh", false);
if cross_domain == true then cross_domain = "*"; end
if type(cross_domain) == "table" then cross_domain = table.concat(cross_domain, ", "); end
-local trusted_proxies = module:get_option_set("trusted_proxies", { "127.0.0.1", "::1" })._items;
-
-local function get_ip_from_request(request)
- local ip = request.conn:ip();
- local forwarded_for = request.headers.x_forwarded_for;
- if forwarded_for then
- forwarded_for = forwarded_for..", "..ip;
- for forwarded_ip in forwarded_for:gmatch("[^%s,]+") do
- if not trusted_proxies[forwarded_ip] then
- ip = forwarded_ip;
- end
- end
- end
- return ip;
-end
-
local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
-local os_time = os.time;
-- All sessions, and sessions that have no requests open
-local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
+local sessions = module:shared("sessions");
-- Used to respond to idle sessions (those with waiting requests)
-local waiting_requests = module:shared("waiting_requests");
function on_destroy_request(request)
log("debug", "Request destroyed: %s", tostring(request));
- waiting_requests[request] = nil;
local session = sessions[request.context.sid];
if session then
local requests = session.requests;
@@ -88,9 +67,24 @@ function on_destroy_request(request)
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
- inactive_sessions[session] = os_time() + max_inactive;
+ if session.inactive_timer then
+ session.inactive_timer:stop();
+ end
+ session.inactive_timer = module:add_timer(max_inactive, check_inactive, session, request.context,
+ "BOSH client silent for over "..max_inactive.." seconds");
(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
end
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
+ end
+ end
+end
+
+function check_inactive(now, session, context, reason) -- luacheck: ignore 212/now
+ if not session.destroyed then
+ sessions[context.sid] = nil;
+ sm_destroy_session(session, reason);
end
end
@@ -124,7 +118,7 @@ function handle_POST(event)
local headers = response.headers;
headers.content_type = "text/xml; charset=utf-8";
- if cross_domain and event.request.headers.origin then
+ if cross_domain and request.headers.origin then
set_cross_domain_headers(response);
end
@@ -148,8 +142,14 @@ function handle_POST(event)
if session then
-- Session was marked as inactive, since we have
-- a request open now, unmark it
- if inactive_sessions[session] and #session.requests > 0 then
- inactive_sessions[session] = nil;
+ if session.inactive_timer and #session.requests > 0 then
+ session.inactive_timer:stop();
+ session.inactive_timer = nil;
+ end
+
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
end
local r = session.requests;
@@ -177,9 +177,6 @@ function handle_POST(event)
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
- if session.bosh_wait then
- waiting_requests[response] = os_time() + session.bosh_wait;
- end
end
if session.bosh_terminate then
@@ -187,6 +184,10 @@ function handle_POST(event)
session:close();
return nil;
else
+ if session.bosh_wait and #session.requests > 0 then
+ session.bosh_wait_timer = module:add_timer(session.bosh_wait, after_bosh_wait, session.requests[1], session)
+ end
+
return true; -- Inform http server we shall reply later
end
elseif response.finished then
@@ -198,6 +199,11 @@ function handle_POST(event)
return tostring(close_reply) .. "\n";
end
+function after_bosh_wait(now, request, session) -- luacheck: ignore 212
+ if request.conn then
+ session.send("");
+ end
+end
local function bosh_reset_stream(session) session.notopen = true; end
@@ -237,10 +243,11 @@ local function bosh_close_stream(session, reason)
held_request:send(response_body);
end
sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
sm_destroy_session(session);
end
+local runner_callbacks = { };
+
-- Handle the <body> tag in the request payload.
function stream_callbacks.streamopened(context, attr)
local request, response = context.request, context.response;
@@ -259,13 +266,10 @@ function stream_callbacks.streamopened(context, attr)
["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
response:send(tostring(close_reply));
return;
- elseif not hosts[to_host] then
- -- Unknown host
- log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
- local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
- ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
- response:send(tostring(close_reply));
- return;
+ elseif to_host ~= module.host then
+ -- Could be meant for a different instance of the module
+ -- if multiple instances are loaded with the same URL then this can happen
+ return; --> 404
end
if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then
log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait));
@@ -287,16 +291,20 @@ function stream_callbacks.streamopened(context, attr)
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
- ip = get_ip_from_request(request);
+ ip = request.ip;
};
sessions[sid] = session;
+ session.thread = runner(function (stanza)
+ session:dispatch_stanza(stanza);
+ end, runner_callbacks, session);
+
local filter = initialize_filters(session);
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
- hosts[session.host].events.fire_event("bosh-session", { session = session, request = request });
+ module:fire_event("bosh-session", { session = session, request = request });
-- Send creation response
local creating_session = true;
@@ -397,13 +405,18 @@ function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
- hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+ module:fire_event("stream-features", { origin = session, features = features });
session.send(features);
session.notopen = nil;
end
end
local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
+
+function runner_callbacks:error(err) -- luacheck: ignore 212/self
+ return handleerr(err);
+end
+
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
@@ -417,9 +430,7 @@ function stream_callbacks.handlestanza(context, stanza)
t_insert(session.bosh_deferred, stanza);
else
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ session.thread:run(stanza);
end
else
log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
@@ -432,13 +443,13 @@ function stream_callbacks.streamclosed(context)
if not context.defer and session.bosh_deferred then
-- Handle deferred stanzas now
local deferred_stanzas = session.bosh_deferred;
- local context = deferred_stanzas.context;
+ local deferred_context = deferred_stanzas.context;
session.bosh_deferred = nil;
log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
session.rid = deferred_stanzas.rid;
- t_insert(session.requests, context.response);
+ t_insert(session.requests, deferred_context.response);
for _, stanza in ipairs(deferred_stanzas) do
- stream_callbacks.handlestanza(context, stanza);
+ stream_callbacks.handlestanza(deferred_context, stanza);
end
if deferred_stanzas.terminate then
session.bosh_terminate = true;
@@ -469,65 +480,25 @@ function stream_callbacks.error(context, error)
end
end
-local dead_sessions = module:shared("dead_sessions");
-function on_timer()
- -- log("debug", "Checking for requests soon to timeout...");
- -- Identify requests timing out within the next few seconds
- local now = os_time() + 3;
- for request, reply_before in pairs(waiting_requests) do
- if reply_before <= now then
- log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now);
- -- Send empty response to let the
- -- client know we're still here
- if request.conn then
- sessions[request.context.sid].send("");
- end
- end
- end
-
- now = now - 3;
- local n_dead_sessions = 0;
- for session, close_after in pairs(inactive_sessions) do
- if close_after < now then
- (session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now);
- sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
- n_dead_sessions = n_dead_sessions + 1;
- dead_sessions[n_dead_sessions] = session;
- end
- end
-
- for i=1,n_dead_sessions do
- local session = dead_sessions[i];
- dead_sessions[i] = nil;
- sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
- end
- return 1;
-end
-module:add_timer(1, on_timer);
-
-
local GET_response = {
headers = {
content_type = "text/html";
};
body = [[<html><body>
<p>It works! Now point your BOSH client to this URL to connect to Prosody.</p>
- <p>For more information see <a href="http://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
+ <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
</body></html>]];
};
-function module.add_host(module)
- module:depends("http");
- module:provides("http", {
- default_path = "/http-bind";
- route = {
- ["GET"] = GET_response;
- ["GET /"] = GET_response;
- ["OPTIONS"] = handle_OPTIONS;
- ["OPTIONS /"] = handle_OPTIONS;
- ["POST"] = handle_POST;
- ["POST /"] = handle_POST;
- };
- });
-end
+module:depends("http");
+module:provides("http", {
+ default_path = "/http-bind";
+ route = {
+ ["GET"] = GET_response;
+ ["GET /"] = GET_response;
+ ["OPTIONS"] = handle_OPTIONS;
+ ["OPTIONS /"] = handle_OPTIONS;
+ ["POST"] = handle_POST;
+ ["POST /"] = handle_POST;
+ };
+});