From a32fb90be6252907ee1481396bef32b3a1ab29cf Mon Sep 17 00:00:00 2001 From: Matthew Wild Date: Sun, 29 Jul 2012 01:56:45 +0100 Subject: mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips. --- plugins/mod_bosh.lua | 80 ++++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index 14445293..66851133 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -242,7 +242,7 @@ function stream_callbacks.streamopened(context, attr) -- New session sid = new_uuid(); local session = { - type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, + type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, @@ -254,6 +254,14 @@ function stream_callbacks.streamopened(context, attr) session.log("debug", "BOSH session created for request from %s", session.ip); log("info", "New BOSH session, assigned it sid '%s'", sid); + + -- Send creation response + local creating_session = true; + local features = st.stanza("stream:features"); + hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); + fire_event("stream-features", session, features); + table.insert(session.send_buffer, tostring(features)); + local r = session.requests; function session.send(s) -- We need to ensure that outgoing stanzas have the jabber:client xmlns @@ -262,47 +270,34 @@ function stream_callbacks.streamopened(context, attr) s.attr.xmlns = "jabber:client"; end --log("debug", "Sending BOSH data: %s", tostring(s)); + t_insert(session.send_buffer, tostring(s)); + local oldest_request = r[1]; - if oldest_request then + if oldest_request and not session.bosh_processing then log("debug", "We have an open request, so sending on that"); oldest_request.headers = default_headers; - oldest_request:send(t_concat({ - "", - tostring(s), - "" - })); - elseif s ~= "" then - log("debug", "Saved to send buffer because there are %d open requests", #r); - -- Hmm, no requests are open :( - t_insert(session.send_buffer, tostring(s)); - log("debug", "There are now %d things in the send_buffer", #session.send_buffer); + local body_attr = { xmlns = "http://jabber.org/protocol/httpbind", + ["xmlns:stream"] = "http://etherx.jabber.org/streams"; + type = session.bosh_terminate and "terminate" or nil; + sid = sid; + }; + if creating_session then + body_attr.wait = attr.wait; + body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY); + body_attr.polling = tostring(BOSH_DEFAULT_POLLING); + body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS); + body_attr.hold = tostring(session.bosh_hold); + body_attr.authid = sid; + body_attr.secure = "true"; + body_attr.ver = '1.6'; from = session.host; + body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; + body_attr["xmpp:version"] = "1.0"; + end + oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer)..""); + session.send_buffer = {}; end return true; end - - -- Send creation response - - local features = st.stanza("stream:features"); - hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - fire_event("stream-features", session, features); - --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' - local body = st.stanza("body", { xmlns = xmlns_bosh, - wait = attr.wait, - inactivity = tostring(BOSH_DEFAULT_INACTIVITY), - polling = tostring(BOSH_DEFAULT_POLLING), - requests = tostring(BOSH_DEFAULT_REQUESTS), - hold = tostring(session.bosh_hold), - sid = sid, authid = sid, - ver = '1.6', from = session.host, - secure = 'true', ["xmpp:version"] = "1.0", - ["xmlns:xmpp"] = "urn:xmpp:xbosh", - ["xmlns:stream"] = "http://etherx.jabber.org/streams" - }):add_child(features); - response.headers = default_headers; - response:send(tostring(body)); - request.sid = sid; return; end @@ -343,12 +338,13 @@ function stream_callbacks.streamopened(context, attr) context.notopen = nil; -- Signals that we accept this opening tag t_insert(session.requests, response); context.sid = sid; + session.bosh_processing = true; -- Used to suppress replies until processing of this request is done if session.notopen then local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); fire_event("stream-features", session, features); - session.send(features); + table.insert(session.send_buffer, tostring(features)); session.notopen = nil; end end @@ -365,6 +361,16 @@ function stream_callbacks.handlestanza(context, stanza) end end +function stream_callbacks.streamclosed(request) + local session = sessions[request.sid]; + if session then + session.bosh_processing = false; + if #session.send_buffer > 0 then + session.send(""); + end + end +end + function stream_callbacks.error(context, error) log("debug", "Error parsing BOSH request payload; %s", error); if not context.sid then -- cgit v1.2.3