aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_bosh.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r--plugins/mod_bosh.lua117
1 files changed, 56 insertions, 61 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 9ef4a41e..2525d975 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -16,11 +16,12 @@ local new_uuid = require "util.uuid".generate;
local core_process_stanza = prosody.core_process_stanza;
local st = require "util.stanza";
local logger = require "util.logger";
-local log = logger.init("mod_bosh");
+local log = module._log;
local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
-local xpcall, tostring, type = xpcall, tostring, type;
+local tostring, type = tostring, type;
local traceback = debug.traceback;
+local runner = require"util.async".runner;
local nameprep = require "util.encodings".stringprep.nameprep;
local xmlns_streams = "http://etherx.jabber.org/streams";
@@ -65,16 +66,13 @@ local function get_ip_from_request(request)
end
local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
-local os_time = os.time;
-- All sessions, and sessions that have no requests open
-local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
+local sessions = module:shared("sessions");
-- Used to respond to idle sessions (those with waiting requests)
-local waiting_requests = module:shared("waiting_requests");
function on_destroy_request(request)
log("debug", "Request destroyed: %s", tostring(request));
- waiting_requests[request] = nil;
local session = sessions[request.context.sid];
if session then
local requests = session.requests;
@@ -88,9 +86,24 @@ function on_destroy_request(request)
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
- inactive_sessions[session] = os_time() + max_inactive;
+ if session.inactive_timer then
+ session.inactive_timer:stop();
+ end
+ session.inactive_timer = module:add_timer(max_inactive, check_inactive, session, request.context,
+ "BOSH client silent for over "..max_inactive.." seconds");
(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
end
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
+ end
+ end
+end
+
+function check_inactive(now, session, context, reason) -- luacheck: ignore 212/now
+ if not session.destroyed then
+ sessions[context.sid] = nil;
+ sm_destroy_session(session, reason);
end
end
@@ -124,7 +137,7 @@ function handle_POST(event)
local headers = response.headers;
headers.content_type = "text/xml; charset=utf-8";
- if cross_domain and event.request.headers.origin then
+ if cross_domain and request.headers.origin then
set_cross_domain_headers(response);
end
@@ -148,8 +161,14 @@ function handle_POST(event)
if session then
-- Session was marked as inactive, since we have
-- a request open now, unmark it
- if inactive_sessions[session] and #session.requests > 0 then
- inactive_sessions[session] = nil;
+ if session.inactive_timer and #session.requests > 0 then
+ session.inactive_timer:stop();
+ session.inactive_timer = nil;
+ end
+
+ if session.bosh_wait_timer then
+ session.bosh_wait_timer:stop();
+ session.bosh_wait_timer = nil;
end
local r = session.requests;
@@ -177,9 +196,6 @@ function handle_POST(event)
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
- if session.bosh_wait then
- waiting_requests[response] = os_time() + session.bosh_wait;
- end
end
if session.bosh_terminate then
@@ -187,6 +203,10 @@ function handle_POST(event)
session:close();
return nil;
else
+ if session.bosh_wait and #session.requests > 0 then
+ session.bosh_wait_timer = module:add_timer(session.bosh_wait, after_bosh_wait, session.requests[1], session)
+ end
+
return true; -- Inform http server we shall reply later
end
elseif response.finished then
@@ -198,6 +218,11 @@ function handle_POST(event)
return tostring(close_reply) .. "\n";
end
+function after_bosh_wait(now, request, session) -- luacheck: ignore 212
+ if request.conn then
+ session.send("");
+ end
+end
local function bosh_reset_stream(session) session.notopen = true; end
@@ -237,10 +262,11 @@ local function bosh_close_stream(session, reason)
held_request:send(response_body);
end
sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
sm_destroy_session(session);
end
+local runner_callbacks = { };
+
-- Handle the <body> tag in the request payload.
function stream_callbacks.streamopened(context, attr)
local request, response = context.request, context.response;
@@ -291,12 +317,16 @@ function stream_callbacks.streamopened(context, attr)
};
sessions[sid] = session;
+ session.thread = runner(function (stanza)
+ session:dispatch_stanza(stanza);
+ end, runner_callbacks, session);
+
local filter = initialize_filters(session);
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
- hosts[session.host].events.fire_event("bosh-session", { session = session, request = request });
+ module:fire_event("bosh-session", { session = session, request = request });
-- Send creation response
local creating_session = true;
@@ -397,13 +427,18 @@ function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
- hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+ module:fire_event("stream-features", { origin = session, features = features });
session.send(features);
session.notopen = nil;
end
end
local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
+
+function runner_callbacks:error(err) -- luacheck: ignore 212/self
+ return handleerr(err);
+end
+
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
@@ -417,9 +452,7 @@ function stream_callbacks.handlestanza(context, stanza)
t_insert(session.bosh_deferred, stanza);
else
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ session.thread:run(stanza);
end
else
log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
@@ -432,13 +465,13 @@ function stream_callbacks.streamclosed(context)
if not context.defer and session.bosh_deferred then
-- Handle deferred stanzas now
local deferred_stanzas = session.bosh_deferred;
- local context = deferred_stanzas.context;
+ local deferred_context = deferred_stanzas.context;
session.bosh_deferred = nil;
log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
session.rid = deferred_stanzas.rid;
- t_insert(session.requests, context.response);
+ t_insert(session.requests, deferred_context.response);
for _, stanza in ipairs(deferred_stanzas) do
- stream_callbacks.handlestanza(context, stanza);
+ stream_callbacks.handlestanza(deferred_context, stanza);
end
if deferred_stanzas.terminate then
session.bosh_terminate = true;
@@ -469,51 +502,13 @@ function stream_callbacks.error(context, error)
end
end
-local dead_sessions = module:shared("dead_sessions");
-function on_timer()
- -- log("debug", "Checking for requests soon to timeout...");
- -- Identify requests timing out within the next few seconds
- local now = os_time() + 3;
- for request, reply_before in pairs(waiting_requests) do
- if reply_before <= now then
- log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now);
- -- Send empty response to let the
- -- client know we're still here
- if request.conn then
- sessions[request.context.sid].send("");
- end
- end
- end
-
- now = now - 3;
- local n_dead_sessions = 0;
- for session, close_after in pairs(inactive_sessions) do
- if close_after < now then
- (session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now);
- sessions[session.sid] = nil;
- inactive_sessions[session] = nil;
- n_dead_sessions = n_dead_sessions + 1;
- dead_sessions[n_dead_sessions] = session;
- end
- end
-
- for i=1,n_dead_sessions do
- local session = dead_sessions[i];
- dead_sessions[i] = nil;
- sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
- end
- return 1;
-end
-module:add_timer(1, on_timer);
-
-
local GET_response = {
headers = {
content_type = "text/html";
};
body = [[<html><body>
<p>It works! Now point your BOSH client to this URL to connect to Prosody.</p>
- <p>For more information see <a href="http://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
+ <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
</body></html>]];
};