aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_bosh.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r--plugins/mod_bosh.lua237
1 files changed, 153 insertions, 84 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 1eb95e90..8cda4a23 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -1,7 +1,7 @@
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
@@ -13,7 +13,6 @@ local new_xmpp_stream = require "util.xmppstream".new;
local sm = require "core.sessionmanager";
local sm_destroy_session = sm.destroy_session;
local new_uuid = require "util.uuid".generate;
-local fire_event = prosody.events.fire_event;
local core_process_stanza = prosody.core_process_stanza;
local st = require "util.stanza";
local logger = require "util.logger";
@@ -22,6 +21,7 @@ local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
+local nameprep = require "util.encodings".stringprep.nameprep;
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -30,33 +30,25 @@ local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a
local stream_callbacks = {
stream_ns = xmlns_bosh, stream_tag = "body", default_ns = "jabber:client" };
-local BOSH_DEFAULT_HOLD = module:get_option_number("bosh_default_hold", 1);
-local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60);
-local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5);
-local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2);
+-- These constants are implicitly assumed within the code, and cannot be changed
+local BOSH_HOLD = 1;
+local BOSH_MAX_REQUESTS = 2;
+
+-- The number of seconds a BOSH session should remain open with no requests
+local bosh_max_inactivity = module:get_option_number("bosh_max_inactivity", 60);
+-- The minimum amount of time between requests with no payload
+local bosh_max_polling = module:get_option_number("bosh_max_polling", 5);
+-- The maximum amount of time that the server will hold onto a request before replying
+-- (the client can set this to a lower value when it connects, if it chooses)
local bosh_max_wait = module:get_option_number("bosh_max_wait", 120);
local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure");
-
-local default_headers = { ["Content-Type"] = "text/xml; charset=utf-8" };
-
local cross_domain = module:get_option("cross_domain_bosh", false);
-if cross_domain then
- default_headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS";
- default_headers["Access-Control-Allow-Headers"] = "Content-Type";
- default_headers["Access-Control-Max-Age"] = "7200";
-
- if cross_domain == true then
- default_headers["Access-Control-Allow-Origin"] = "*";
- elseif type(cross_domain) == "table" then
- cross_domain = table.concat(cross_domain, ", ");
- end
- if type(cross_domain) == "string" then
- default_headers["Access-Control-Allow-Origin"] = cross_domain;
- end
-end
-local trusted_proxies = module:get_option_set("trusted_proxies", {"127.0.0.1"})._items;
+if cross_domain == true then cross_domain = "*"; end
+if type(cross_domain) == "table" then cross_domain = table.concat(cross_domain, ", "); end
+
+local trusted_proxies = module:get_option_set("trusted_proxies", { "127.0.0.1", "::1" })._items;
local function get_ip_from_request(request)
local ip = request.conn:ip();
@@ -79,7 +71,7 @@ local os_time = os.time;
local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
-- Used to respond to idle sessions (those with waiting requests)
-local waiting_requests = {};
+local waiting_requests = module:shared("waiting_requests");
function on_destroy_request(request)
log("debug", "Request destroyed: %s", tostring(request));
waiting_requests[request] = nil;
@@ -92,7 +84,7 @@ function on_destroy_request(request)
break;
end
end
-
+
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
@@ -102,11 +94,20 @@ function on_destroy_request(request)
end
end
-function handle_OPTIONS(request)
- local headers = {};
- for k,v in pairs(default_headers) do headers[k] = v; end
- headers["Content-Type"] = nil;
- return { headers = headers, body = "" };
+local function set_cross_domain_headers(response)
+ local headers = response.headers;
+ headers.access_control_allow_methods = "GET, POST, OPTIONS";
+ headers.access_control_allow_headers = "Content-Type";
+ headers.access_control_max_age = "7200";
+ headers.access_control_allow_origin = cross_domain;
+ return response;
+end
+
+function handle_OPTIONS(event)
+ if cross_domain and event.request.headers.origin then
+ set_cross_domain_headers(event.response);
+ end
+ return "";
end
function handle_POST(event)
@@ -119,14 +120,27 @@ function handle_POST(event)
local context = { request = request, response = response, notopen = true };
local stream = new_xmpp_stream(context, stream_callbacks);
response.context = context;
-
+
+ local headers = response.headers;
+ headers.content_type = "text/xml; charset=utf-8";
+
+ if cross_domain and event.request.headers.origin then
+ set_cross_domain_headers(response);
+ end
+
-- stream:feed() calls the stream_callbacks, so all stanzas in
-- the body are processed in this next line before it returns.
-- In particular, the streamopened() stream callback is where
-- much of the session logic happens, because it's where we first
-- get to see the 'sid' of this request.
- stream:feed(body);
-
+ local ok, err = stream:feed(body);
+ if not ok then
+ module:log("warn", "Error parsing BOSH payload; %s", err)
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ return tostring(close_reply);
+ end
+
-- Stanzas (if any) in the request have now been processed, and
-- we take care of the high-level BOSH logic here, including
-- giving a response or putting the request "on hold".
@@ -139,12 +153,9 @@ function handle_POST(event)
end
local r = session.requests;
- log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold);
+ log("debug", "Session %s has %d out of %d requests open", context.sid, #r, BOSH_HOLD);
log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
- for i, thing in ipairs(session.send_buffer) do
- log("debug", " %s", tostring(thing));
- end
- if #r > session.bosh_hold then
+ if #r > BOSH_HOLD then
-- We are holding too many requests, send what's in the buffer,
log("debug", "We are holding too many requests, so...");
if #session.send_buffer > 0 then
@@ -162,7 +173,7 @@ function handle_POST(event)
session.send_buffer = {};
session.send(resp);
end
-
+
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
@@ -170,7 +181,7 @@ function handle_POST(event)
waiting_requests[response] = os_time() + session.bosh_wait;
end
end
-
+
if session.bosh_terminate then
session.log("debug", "Closing session with %d requests open", #session.requests);
session:close();
@@ -178,7 +189,13 @@ function handle_POST(event)
else
return true; -- Inform http server we shall reply later
end
+ elseif response.finished then
+ return; -- A response has been sent already
end
+ module:log("warn", "Unable to associate request with a session (incomplete request?)");
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "item-not-found" });
+ return tostring(close_reply) .. "\n";
end
@@ -188,10 +205,10 @@ local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
local function bosh_close_stream(session, reason)
(session.log or log)("info", "BOSH client disconnected");
-
+
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams });
-
+
if reason then
close_reply.attr.condition = "remote-stream-error";
@@ -217,10 +234,9 @@ local function bosh_close_stream(session, reason)
local response_body = tostring(close_reply);
for _, held_request in ipairs(session.requests) do
- held_request.headers = default_headers;
held_request:send(response_body);
end
- sessions[session.sid] = nil;
+ sessions[session.sid] = nil;
inactive_sessions[session] = nil;
sm_destroy_session(session);
end
@@ -233,9 +249,17 @@ function stream_callbacks.streamopened(context, attr)
if not sid then
-- New session request
context.notopen = nil; -- Signals that we accept this opening tag
-
- -- TODO: Sanity checks here (rid, to, known host, etc.)
- if not hosts[attr.to] then
+
+ local to_host = nameprep(attr.to);
+ local rid = tonumber(attr.rid);
+ local wait = tonumber(attr.wait);
+ if not to_host then
+ log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
+ response:send(tostring(close_reply));
+ return;
+ elseif not hosts[to_host] then
-- Unknown host
log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
@@ -243,25 +267,37 @@ function stream_callbacks.streamopened(context, attr)
response:send(tostring(close_reply));
return;
end
-
+ if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then
+ log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait));
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ response:send(tostring(close_reply));
+ return;
+ end
+
+ rid = rid - 1;
+ wait = math_min(wait, bosh_max_wait);
+
-- New session
sid = new_uuid();
local session = {
- type = "c2s_unauthed", conn = request.conn, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
- bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid,
- bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
+ type = "c2s_unauthed", conn = request.conn, sid = sid, rid = rid, host = to_host,
+ bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
+ bosh_max_inactive = bosh_max_inactivity,
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
ip = get_ip_from_request(request);
};
sessions[sid] = session;
-
+
local filter = initialize_filters(session);
-
+
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
+ hosts[session.host].events.fire_event("bosh-session", { session = session, request = request });
+
-- Send creation response
local creating_session = true;
@@ -274,12 +310,12 @@ function stream_callbacks.streamopened(context, attr)
end
s = filter("stanzas/out", s);
--log("debug", "Sending BOSH data: %s", tostring(s));
+ if not s then return true end
t_insert(session.send_buffer, tostring(s));
local oldest_request = r[1];
if oldest_request and not session.bosh_processing then
log("debug", "We have an open request, so sending on that");
- oldest_request.headers = default_headers;
local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
["xmlns:stream"] = "http://etherx.jabber.org/streams";
type = session.bosh_terminate and "terminate" or nil;
@@ -287,11 +323,11 @@ function stream_callbacks.streamopened(context, attr)
};
if creating_session then
creating_session = nil;
- body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
- body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
- body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
+ body_attr.requests = tostring(BOSH_MAX_REQUESTS);
+ body_attr.hold = tostring(BOSH_HOLD);
+ body_attr.inactivity = tostring(bosh_max_inactivity);
+ body_attr.polling = tostring(bosh_max_polling);
body_attr.wait = tostring(session.bosh_wait);
- body_attr.hold = tostring(session.bosh_hold);
body_attr.authid = sid;
body_attr.secure = "true";
body_attr.ver = '1.6';
@@ -299,43 +335,55 @@ function stream_callbacks.streamopened(context, attr)
body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
body_attr["xmpp:version"] = "1.0";
end
- oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>");
+ session.bosh_last_response = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
+ oldest_request:send(session.bosh_last_response);
session.send_buffer = {};
end
return true;
end
request.sid = sid;
end
-
+
local session = sessions[sid];
if not session then
-- Unknown sid
log("info", "Client tried to use sid '%s' which we don't know about", sid);
- response.headers = default_headers;
response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
context.notopen = nil;
return;
end
session.conn = request.conn;
-
+
if session.rid then
local rid = tonumber(attr.rid);
local diff = rid - session.rid;
- if diff > 1 then
- session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
- elseif diff <= 0 then
- -- Repeated, ignore
- session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
+ -- Diff should be 1 for a healthy request
+ if diff ~= 1 then
+ context.sid = sid;
context.notopen = nil;
+ if diff == 2 then
+ -- Hold request, but don't process it (ouch!)
+ session.log("debug", "rid skipped: %d, deferring this request", rid-1)
+ context.defer = true;
+ session.bosh_deferred = { context = context, sid = sid, rid = rid, terminate = attr.type == "terminate" };
+ return;
+ end
context.ignore = true;
- context.sid = sid;
- t_insert(session.requests, response);
+ if diff == 0 then
+ -- Re-send previous response, ignore stanzas in this request
+ session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
+ response:send(session.bosh_last_response);
+ return;
+ end
+ -- Session broken, destroy it
+ session.log("debug", "rid out of range: %d (diff %d)", rid, diff);
+ response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
return;
end
session.rid = rid;
end
-
+
if attr.type == "terminate" then
-- Client wants to end this session, which we'll do
-- after processing any stanzas in this request
@@ -350,8 +398,7 @@ function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- fire_event("stream-features", session, features);
- session.send(tostring(features));
+ session.send(features);
session.notopen = nil;
end
end
@@ -365,16 +412,38 @@ function stream_callbacks.handlestanza(context, stanza)
if stanza.attr.xmlns == xmlns_bosh then
stanza.attr.xmlns = nil;
end
- stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
+ if context.defer and session.bosh_deferred then
+ log("debug", "Deferring this stanza");
+ t_insert(session.bosh_deferred, stanza);
+ else
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
+ end
end
+ else
+ log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
end
end
-function stream_callbacks.streamclosed(request)
- local session = sessions[request.sid];
+function stream_callbacks.streamclosed(context)
+ local session = sessions[context.sid];
if session then
+ if not context.defer and session.bosh_deferred then
+ -- Handle deferred stanzas now
+ local deferred_stanzas = session.bosh_deferred;
+ local context = deferred_stanzas.context;
+ session.bosh_deferred = nil;
+ log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
+ session.rid = deferred_stanzas.rid;
+ t_insert(session.requests, context.response);
+ for _, stanza in ipairs(deferred_stanzas) do
+ stream_callbacks.handlestanza(context, stanza);
+ end
+ if deferred_stanzas.terminate then
+ session.bosh_terminate = true;
+ end
+ end
session.bosh_processing = false;
if #session.send_buffer > 0 then
session.send("");
@@ -386,12 +455,12 @@ function stream_callbacks.error(context, error)
log("debug", "Error parsing BOSH request payload; %s", error);
if not context.sid then
local response = context.response;
- response.headers = default_headers;
- response.status_code = 400;
- response:send();
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ response:send(tostring(close_reply));
return;
end
-
+
local session = sessions[context.sid];
if error == "stream-error" then -- Remote stream error, we close normally
session:close();
@@ -400,7 +469,7 @@ function stream_callbacks.error(context, error)
end
end
-local dead_sessions = {};
+local dead_sessions = module:shared("dead_sessions");
function on_timer()
-- log("debug", "Checking for requests soon to timeout...");
-- Identify requests timing out within the next few seconds
@@ -415,7 +484,7 @@ function on_timer()
end
end
end
-
+
now = now - 3;
local n_dead_sessions = 0;
for session, close_after in pairs(inactive_sessions) do