aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_bosh.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r--plugins/mod_bosh.lua150
1 files changed, 89 insertions, 61 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index d9c8defd..34613cbb 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -1,7 +1,7 @@
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
@@ -13,7 +13,6 @@ local new_xmpp_stream = require "util.xmppstream".new;
local sm = require "core.sessionmanager";
local sm_destroy_session = sm.destroy_session;
local new_uuid = require "util.uuid".generate;
-local fire_event = prosody.events.fire_event;
local core_process_stanza = prosody.core_process_stanza;
local st = require "util.stanza";
local logger = require "util.logger";
@@ -22,6 +21,7 @@ local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
+local nameprep = require "util.encodings".stringprep.nameprep;
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -37,24 +37,10 @@ local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2);
local bosh_max_wait = module:get_option_number("bosh_max_wait", 120);
local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure");
-
-local default_headers = { ["Content-Type"] = "text/xml; charset=utf-8" };
-
local cross_domain = module:get_option("cross_domain_bosh", false);
-if cross_domain then
- default_headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS";
- default_headers["Access-Control-Allow-Headers"] = "Content-Type";
- default_headers["Access-Control-Max-Age"] = "7200";
-
- if cross_domain == true then
- default_headers["Access-Control-Allow-Origin"] = "*";
- elseif type(cross_domain) == "table" then
- cross_domain = table.concat(cross_domain, ", ");
- end
- if type(cross_domain) == "string" then
- default_headers["Access-Control-Allow-Origin"] = cross_domain;
- end
-end
+
+if cross_domain == true then cross_domain = "*"; end
+if type(cross_domain) == "table" then cross_domain = table.concat(cross_domain, ", "); end
local trusted_proxies = module:get_option_set("trusted_proxies", {"127.0.0.1"})._items;
@@ -79,7 +65,7 @@ local os_time = os.time;
local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
-- Used to respond to idle sessions (those with waiting requests)
-local waiting_requests = {};
+local waiting_requests = module:shared("waiting_requests");
function on_destroy_request(request)
log("debug", "Request destroyed: %s", tostring(request));
waiting_requests[request] = nil;
@@ -92,7 +78,7 @@ function on_destroy_request(request)
break;
end
end
-
+
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
@@ -102,11 +88,20 @@ function on_destroy_request(request)
end
end
-function handle_OPTIONS(request)
- local headers = {};
- for k,v in pairs(default_headers) do headers[k] = v; end
- headers["Content-Type"] = nil;
- return { headers = headers, body = "" };
+local function set_cross_domain_headers(response)
+ local headers = response.headers;
+ headers.access_control_allow_methods = "GET, POST, OPTIONS";
+ headers.access_control_allow_headers = "Content-Type";
+ headers.access_control_max_age = "7200";
+ headers.access_control_allow_origin = cross_domain;
+ return response;
+end
+
+function handle_OPTIONS(event)
+ if cross_domain and event.request.headers.origin then
+ set_cross_domain_headers(event.response);
+ end
+ return "";
end
function handle_POST(event)
@@ -119,14 +114,27 @@ function handle_POST(event)
local context = { request = request, response = response, notopen = true };
local stream = new_xmpp_stream(context, stream_callbacks);
response.context = context;
-
+
+ local headers = response.headers;
+ headers.content_type = "text/xml; charset=utf-8";
+
+ if cross_domain and event.request.headers.origin then
+ set_cross_domain_headers(response);
+ end
+
-- stream:feed() calls the stream_callbacks, so all stanzas in
-- the body are processed in this next line before it returns.
-- In particular, the streamopened() stream callback is where
-- much of the session logic happens, because it's where we first
-- get to see the 'sid' of this request.
- stream:feed(body);
-
+ local ok, err = stream:feed(body);
+ if not ok then
+ module:log("warn", "Error parsing BOSH payload; %s", err)
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ return tostring(close_reply);
+ end
+
-- Stanzas (if any) in the request have now been processed, and
-- we take care of the high-level BOSH logic here, including
-- giving a response or putting the request "on hold".
@@ -141,9 +149,6 @@ function handle_POST(event)
local r = session.requests;
log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold);
log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
- for i, thing in ipairs(session.send_buffer) do
- log("debug", " %s", tostring(thing));
- end
if #r > session.bosh_hold then
-- We are holding too many requests, send what's in the buffer,
log("debug", "We are holding too many requests, so...");
@@ -162,7 +167,7 @@ function handle_POST(event)
session.send_buffer = {};
session.send(resp);
end
-
+
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
@@ -170,7 +175,7 @@ function handle_POST(event)
waiting_requests[response] = os_time() + session.bosh_wait;
end
end
-
+
if session.bosh_terminate then
session.log("debug", "Closing session with %d requests open", #session.requests);
session:close();
@@ -178,7 +183,13 @@ function handle_POST(event)
else
return true; -- Inform http server we shall reply later
end
+ elseif response.finished then
+ return; -- A response has been sent already
end
+ module:log("warn", "Unable to associate request with a session (incomplete request?)");
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "item-not-found" });
+ return tostring(close_reply) .. "\n";
end
@@ -188,10 +199,10 @@ local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
local function bosh_close_stream(session, reason)
(session.log or log)("info", "BOSH client disconnected");
-
+
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams });
-
+
if reason then
close_reply.attr.condition = "remote-stream-error";
@@ -217,10 +228,9 @@ local function bosh_close_stream(session, reason)
local response_body = tostring(close_reply);
for _, held_request in ipairs(session.requests) do
- held_request.headers = default_headers;
held_request:send(response_body);
end
- sessions[session.sid] = nil;
+ sessions[session.sid] = nil;
inactive_sessions[session] = nil;
sm_destroy_session(session);
end
@@ -233,9 +243,17 @@ function stream_callbacks.streamopened(context, attr)
if not sid then
-- New session request
context.notopen = nil; -- Signals that we accept this opening tag
-
- -- TODO: Sanity checks here (rid, to, known host, etc.)
- if not hosts[attr.to] then
+
+ local to_host = nameprep(attr.to);
+ local rid = tonumber(attr.rid);
+ local wait = tonumber(attr.wait);
+ if not to_host then
+ log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
+ response:send(tostring(close_reply));
+ return;
+ elseif not hosts[to_host] then
-- Unknown host
log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
@@ -243,12 +261,22 @@ function stream_callbacks.streamopened(context, attr)
response:send(tostring(close_reply));
return;
end
-
+ if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then
+ log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait));
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ response:send(tostring(close_reply));
+ return;
+ end
+
+ rid = rid - 1;
+ wait = math_min(wait, bosh_max_wait);
+
-- New session
sid = new_uuid();
local session = {
- type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
- bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid,
+ type = "c2s_unauthed", conn = {}, sid = sid, rid = rid, host = attr.to,
+ bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
@@ -256,12 +284,14 @@ function stream_callbacks.streamopened(context, attr)
ip = get_ip_from_request(request);
};
sessions[sid] = session;
-
+
local filter = initialize_filters(session);
-
+
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
+ hosts[session.host].events.fire_event("bosh-session", { session = session, request = request });
+
-- Send creation response
local creating_session = true;
@@ -274,12 +304,12 @@ function stream_callbacks.streamopened(context, attr)
end
s = filter("stanzas/out", s);
--log("debug", "Sending BOSH data: %s", tostring(s));
+ if not s then return true end
t_insert(session.send_buffer, tostring(s));
local oldest_request = r[1];
if oldest_request and not session.bosh_processing then
log("debug", "We have an open request, so sending on that");
- oldest_request.headers = default_headers;
local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
["xmlns:stream"] = "http://etherx.jabber.org/streams";
type = session.bosh_terminate and "terminate" or nil;
@@ -306,17 +336,16 @@ function stream_callbacks.streamopened(context, attr)
end
request.sid = sid;
end
-
+
local session = sessions[sid];
if not session then
-- Unknown sid
log("info", "Client tried to use sid '%s' which we don't know about", sid);
- response.headers = default_headers;
response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
context.notopen = nil;
return;
end
-
+
if session.rid then
local rid = tonumber(attr.rid);
local diff = rid - session.rid;
@@ -333,7 +362,7 @@ function stream_callbacks.streamopened(context, attr)
end
session.rid = rid;
end
-
+
if attr.type == "terminate" then
-- Client wants to end this session, which we'll do
-- after processing any stanzas in this request
@@ -348,8 +377,7 @@ function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- fire_event("stream-features", session, features);
- session.send(tostring(features));
+ session.send(features);
session.notopen = nil;
end
end
@@ -370,8 +398,8 @@ function stream_callbacks.handlestanza(context, stanza)
end
end
-function stream_callbacks.streamclosed(request)
- local session = sessions[request.sid];
+function stream_callbacks.streamclosed(context)
+ local session = sessions[context.sid];
if session then
session.bosh_processing = false;
if #session.send_buffer > 0 then
@@ -384,12 +412,12 @@ function stream_callbacks.error(context, error)
log("debug", "Error parsing BOSH request payload; %s", error);
if not context.sid then
local response = context.response;
- response.headers = default_headers;
- response.status_code = 400;
- response:send();
+ local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
+ ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
+ response:send(tostring(close_reply));
return;
end
-
+
local session = sessions[context.sid];
if error == "stream-error" then -- Remote stream error, we close normally
session:close();
@@ -398,7 +426,7 @@ function stream_callbacks.error(context, error)
end
end
-local dead_sessions = {};
+local dead_sessions = module:shared("dead_sessions");
function on_timer()
-- log("debug", "Checking for requests soon to timeout...");
-- Identify requests timing out within the next few seconds
@@ -413,7 +441,7 @@ function on_timer()
end
end
end
-
+
now = now - 3;
local n_dead_sessions = 0;
for session, close_after in pairs(inactive_sessions) do