aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2016-09-02 23:00:43 +0100
committerMatthew Wild <mwild1@gmail.com>2016-09-02 23:00:43 +0100
commitb0cec25c89cb802b8fa10ca8fbcd0db83114daac (patch)
tree1bdf6a9203273982118d9227d505623044e35d40
parent598b3b441e7f40612f96fcf59e17de2b594d6f3b (diff)
parentd5278ce2fcf7c49ba0f30ca8cd03ebbd5f8ac678 (diff)
downloadprosody-b0cec25c89cb802b8fa10ca8fbcd0db83114daac.tar.gz
prosody-b0cec25c89cb802b8fa10ca8fbcd0db83114daac.zip
Merge 0.10->trunk
-rw-r--r--plugins/mod_bosh.lua80
1 files changed, 60 insertions, 20 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 39c5ee4b..9e3bea8f 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -31,10 +31,16 @@ local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a
local stream_callbacks = {
stream_ns = xmlns_bosh, stream_tag = "body", default_ns = "jabber:client" };
-local BOSH_DEFAULT_HOLD = module:get_option_number("bosh_default_hold", 1);
-local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60);
-local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5);
-local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2);
+-- These constants are implicitly assumed within the code, and cannot be changed
+local BOSH_HOLD = 1;
+local BOSH_MAX_REQUESTS = 2;
+
+-- The number of seconds a BOSH session should remain open with no requests
+local bosh_max_inactivity = module:get_option_number("bosh_max_inactivity", 60);
+-- The minimum amount of time between requests with no payload
+local bosh_max_polling = module:get_option_number("bosh_max_polling", 5);
+-- The maximum amount of time that the server will hold onto a request before replying
+-- (the client can set this to a lower value when it connects, if it chooses)
local bosh_max_wait = module:get_option_number("bosh_max_wait", 120);
local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure");
@@ -167,9 +173,9 @@ function handle_POST(event)
end
local r = session.requests;
- log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold);
+ log("debug", "Session %s has %d out of %d requests open", context.sid, #r, BOSH_HOLD);
log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
- if #r > session.bosh_hold then
+ if #r > BOSH_HOLD then
-- We are holding too many requests, send what's in the buffer,
log("debug", "We are holding too many requests, so...");
if #session.send_buffer > 0 then
@@ -303,7 +309,7 @@ function stream_callbacks.streamopened(context, attr)
local session = {
type = "c2s_unauthed", conn = {}, sid = sid, rid = rid, host = attr.to,
bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
- bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
+ bosh_max_inactive = bosh_max_inactivity,
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
@@ -347,11 +353,11 @@ function stream_callbacks.streamopened(context, attr)
};
if creating_session then
creating_session = nil;
- body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
- body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
- body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
+ body_attr.requests = tostring(BOSH_MAX_REQUESTS);
+ body_attr.hold = tostring(BOSH_HOLD);
+ body_attr.inactivity = tostring(bosh_max_inactivity);
+ body_attr.polling = tostring(bosh_max_polling);
body_attr.wait = tostring(session.bosh_wait);
- body_attr.hold = tostring(session.bosh_hold);
body_attr.authid = sid;
body_attr.secure = "true";
body_attr.ver = '1.6';
@@ -379,15 +385,27 @@ function stream_callbacks.streamopened(context, attr)
if session.rid then
local rid = tonumber(attr.rid);
local diff = rid - session.rid;
- if diff > 1 then
- session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
- elseif diff <= 0 then
- -- Repeated, ignore
- session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
+ -- Diff should be 1 for a healthy request
+ if diff ~= 1 then
+ context.sid = sid;
context.notopen = nil;
+ if diff == 2 then
+ -- Hold request, but don't process it (ouch!)
+ session.log("debug", "rid skipped: %d, deferring this request", rid-1)
+ context.defer = true;
+ session.bosh_deferred = { context = context, sid = sid, rid = rid, terminate = attr.type == "terminate" };
+ return;
+ end
context.ignore = true;
- context.sid = sid;
- t_insert(session.requests, response);
+ if diff == 0 then
+ -- Re-send previous response, ignore stanzas in this request
+ session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
+ response:send(session.bosh_last_response);
+ return;
+ end
+ -- Session broken, destroy it
+ session.log("debug", "rid out of range: %d (diff %d)", rid, diff);
+ response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
return;
end
session.rid = rid;
@@ -426,14 +444,36 @@ function stream_callbacks.handlestanza(context, stanza)
if stanza.attr.xmlns == xmlns_bosh then
stanza.attr.xmlns = nil;
end
- stanza = session.filter("stanzas/in", stanza);
- session.thread:run(stanza);
+ if context.defer and session.bosh_deferred then
+ log("debug", "Deferring this stanza");
+ t_insert(session.bosh_deferred, stanza);
+ else
+ stanza = session.filter("stanzas/in", stanza);
+ session.thread:run(stanza);
+ end
+ else
+ log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
end
end
function stream_callbacks.streamclosed(context)
local session = sessions[context.sid];
if session then
+ if not context.defer and session.bosh_deferred then
+ -- Handle deferred stanzas now
+ local deferred_stanzas = session.bosh_deferred;
+ local context = deferred_stanzas.context;
+ session.bosh_deferred = nil;
+ log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
+ session.rid = deferred_stanzas.rid;
+ t_insert(session.requests, context.response);
+ for _, stanza in ipairs(deferred_stanzas) do
+ stream_callbacks.handlestanza(context, stanza);
+ end
+ if deferred_stanzas.terminate then
+ session.bosh_terminate = true;
+ end
+ end
session.bosh_processing = false;
if #session.send_buffer > 0 then
session.send("");