aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Wild <mwild1@gmail.com>2012-07-29 01:56:45 +0100
committerMatthew Wild <mwild1@gmail.com>2012-07-29 01:56:45 +0100
commita32fb90be6252907ee1481396bef32b3a1ab29cf (patch)
treef78524f428f9c82f0cac6296addfda7c1a3de947
parentce88f1c98c8ed5764681152ae3d3f66515da194a (diff)
downloadprosody-a32fb90be6252907ee1481396bef32b3a1ab29cf.tar.gz
prosody-a32fb90be6252907ee1481396bef32b3a1ab29cf.zip
mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips.
-rw-r--r--plugins/mod_bosh.lua80
1 files changed, 43 insertions, 37 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua
index 14445293..66851133 100644
--- a/plugins/mod_bosh.lua
+++ b/plugins/mod_bosh.lua
@@ -242,7 +242,7 @@ function stream_callbacks.streamopened(context, attr)
-- New session
sid = new_uuid();
local session = {
- type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to,
+ type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
@@ -254,6 +254,14 @@ function stream_callbacks.streamopened(context, attr)
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
+
+ -- Send creation response
+ local creating_session = true;
+ local features = st.stanza("stream:features");
+ hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+ fire_event("stream-features", session, features);
+ table.insert(session.send_buffer, tostring(features));
+
local r = session.requests;
function session.send(s)
-- We need to ensure that outgoing stanzas have the jabber:client xmlns
@@ -262,47 +270,34 @@ function stream_callbacks.streamopened(context, attr)
s.attr.xmlns = "jabber:client";
end
--log("debug", "Sending BOSH data: %s", tostring(s));
+ t_insert(session.send_buffer, tostring(s));
+
local oldest_request = r[1];
- if oldest_request then
+ if oldest_request and not session.bosh_processing then
log("debug", "We have an open request, so sending on that");
oldest_request.headers = default_headers;
- oldest_request:send(t_concat({
- "<body xmlns='http://jabber.org/protocol/httpbind' ",
- session.bosh_terminate and "type='terminate' " or "",
- "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
- tostring(s),
- "</body>"
- }));
- elseif s ~= "" then
- log("debug", "Saved to send buffer because there are %d open requests", #r);
- -- Hmm, no requests are open :(
- t_insert(session.send_buffer, tostring(s));
- log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
+ local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
+ ["xmlns:stream"] = "http://etherx.jabber.org/streams";
+ type = session.bosh_terminate and "terminate" or nil;
+ sid = sid;
+ };
+ if creating_session then
+ body_attr.wait = attr.wait;
+ body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
+ body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
+ body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
+ body_attr.hold = tostring(session.bosh_hold);
+ body_attr.authid = sid;
+ body_attr.secure = "true";
+ body_attr.ver = '1.6'; from = session.host;
+ body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
+ body_attr["xmpp:version"] = "1.0";
+ end
+ oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>");
+ session.send_buffer = {};
end
return true;
end
-
- -- Send creation response
-
- local features = st.stanza("stream:features");
- hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- fire_event("stream-features", session, features);
- --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
- local body = st.stanza("body", { xmlns = xmlns_bosh,
- wait = attr.wait,
- inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
- polling = tostring(BOSH_DEFAULT_POLLING),
- requests = tostring(BOSH_DEFAULT_REQUESTS),
- hold = tostring(session.bosh_hold),
- sid = sid, authid = sid,
- ver = '1.6', from = session.host,
- secure = 'true', ["xmpp:version"] = "1.0",
- ["xmlns:xmpp"] = "urn:xmpp:xbosh",
- ["xmlns:stream"] = "http://etherx.jabber.org/streams"
- }):add_child(features);
- response.headers = default_headers;
- response:send(tostring(body));
-
request.sid = sid;
return;
end
@@ -343,12 +338,13 @@ function stream_callbacks.streamopened(context, attr)
context.notopen = nil; -- Signals that we accept this opening tag
t_insert(session.requests, response);
context.sid = sid;
+ session.bosh_processing = true; -- Used to suppress replies until processing of this request is done
if session.notopen then
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
fire_event("stream-features", session, features);
- session.send(features);
+ table.insert(session.send_buffer, tostring(features));
session.notopen = nil;
end
end
@@ -365,6 +361,16 @@ function stream_callbacks.handlestanza(context, stanza)
end
end
+function stream_callbacks.streamclosed(request)
+ local session = sessions[request.sid];
+ if session then
+ session.bosh_processing = false;
+ if #session.send_buffer > 0 then
+ session.send("");
+ end
+ end
+end
+
function stream_callbacks.error(context, error)
log("debug", "Error parsing BOSH request payload; %s", error);
if not context.sid then