diff options
Diffstat (limited to 'plugins/mod_bosh.lua')
-rw-r--r-- | plugins/mod_bosh.lua | 157 |
1 files changed, 72 insertions, 85 deletions
diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index 9ef4a41e..5e7701a3 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -6,9 +6,6 @@ -- COPYING file in the source package for more information. -- -module:set_global(); -- Global module - -local hosts = _G.hosts; local new_xmpp_stream = require "util.xmppstream".new; local sm = require "core.sessionmanager"; local sm_destroy_session = sm.destroy_session; @@ -16,11 +13,12 @@ local new_uuid = require "util.uuid".generate; local core_process_stanza = prosody.core_process_stanza; local st = require "util.stanza"; local logger = require "util.logger"; -local log = logger.init("mod_bosh"); +local log = module._log; local initialize_filters = require "util.filters".initialize; local math_min = math.min; -local xpcall, tostring, type = xpcall, tostring, type; +local tostring, type = tostring, type; local traceback = debug.traceback; +local runner = require"util.async".runner; local nameprep = require "util.encodings".stringprep.nameprep; local xmlns_streams = "http://etherx.jabber.org/streams"; @@ -65,16 +63,13 @@ local function get_ip_from_request(request) end local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; -local os_time = os.time; -- All sessions, and sessions that have no requests open -local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions"); +local sessions = module:shared("sessions"); -- Used to respond to idle sessions (those with waiting requests) -local waiting_requests = module:shared("waiting_requests"); function on_destroy_request(request) log("debug", "Request destroyed: %s", tostring(request)); - waiting_requests[request] = nil; local session = sessions[request.context.sid]; if session then local requests = session.requests; @@ -88,9 +83,24 @@ function on_destroy_request(request) -- If this session now has no requests open, mark it as inactive local max_inactive = session.bosh_max_inactive; if max_inactive and #requests == 0 then - inactive_sessions[session] = os_time() + max_inactive; + if session.inactive_timer then + session.inactive_timer:stop(); + end + session.inactive_timer = module:add_timer(max_inactive, check_inactive, session, request.context, + "BOSH client silent for over "..max_inactive.." seconds"); (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); end + if session.bosh_wait_timer then + session.bosh_wait_timer:stop(); + session.bosh_wait_timer = nil; + end + end +end + +function check_inactive(now, session, context, reason) -- luacheck: ignore 212/now + if not session.destroyed then + sessions[context.sid] = nil; + sm_destroy_session(session, reason); end end @@ -124,7 +134,7 @@ function handle_POST(event) local headers = response.headers; headers.content_type = "text/xml; charset=utf-8"; - if cross_domain and event.request.headers.origin then + if cross_domain and request.headers.origin then set_cross_domain_headers(response); end @@ -148,8 +158,14 @@ function handle_POST(event) if session then -- Session was marked as inactive, since we have -- a request open now, unmark it - if inactive_sessions[session] and #session.requests > 0 then - inactive_sessions[session] = nil; + if session.inactive_timer and #session.requests > 0 then + session.inactive_timer:stop(); + session.inactive_timer = nil; + end + + if session.bosh_wait_timer then + session.bosh_wait_timer:stop(); + session.bosh_wait_timer = nil; end local r = session.requests; @@ -177,9 +193,6 @@ function handle_POST(event) if not response.finished then -- We're keeping this request open, to respond later log("debug", "Have nothing to say, so leaving request unanswered for now"); - if session.bosh_wait then - waiting_requests[response] = os_time() + session.bosh_wait; - end end if session.bosh_terminate then @@ -187,6 +200,10 @@ function handle_POST(event) session:close(); return nil; else + if session.bosh_wait and #session.requests > 0 then + session.bosh_wait_timer = module:add_timer(session.bosh_wait, after_bosh_wait, session.requests[1], session) + end + return true; -- Inform http server we shall reply later end elseif response.finished then @@ -198,6 +215,11 @@ function handle_POST(event) return tostring(close_reply) .. "\n"; end +function after_bosh_wait(now, request, session) -- luacheck: ignore 212 + if request.conn then + session.send(""); + end +end local function bosh_reset_stream(session) session.notopen = true; end @@ -237,10 +259,11 @@ local function bosh_close_stream(session, reason) held_request:send(response_body); end sessions[session.sid] = nil; - inactive_sessions[session] = nil; sm_destroy_session(session); end +local runner_callbacks = { }; + -- Handle the <body> tag in the request payload. function stream_callbacks.streamopened(context, attr) local request, response = context.request, context.response; @@ -259,13 +282,10 @@ function stream_callbacks.streamopened(context, attr) ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" }); response:send(tostring(close_reply)); return; - elseif not hosts[to_host] then - -- Unknown host - log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); - local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", - ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); - response:send(tostring(close_reply)); - return; + elseif to_host ~= module.host then + -- Could be meant for a different instance of the module + -- if multiple instances are loaded with the same URL then this can happen + return; --> 404 end if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait)); @@ -291,12 +311,16 @@ function stream_callbacks.streamopened(context, attr) }; sessions[sid] = session; + session.thread = runner(function (stanza) + session:dispatch_stanza(stanza); + end, runner_callbacks, session); + local filter = initialize_filters(session); session.log("debug", "BOSH session created for request from %s", session.ip); log("info", "New BOSH session, assigned it sid '%s'", sid); - hosts[session.host].events.fire_event("bosh-session", { session = session, request = request }); + module:fire_event("bosh-session", { session = session, request = request }); -- Send creation response local creating_session = true; @@ -397,13 +421,18 @@ function stream_callbacks.streamopened(context, attr) if session.notopen then local features = st.stanza("stream:features"); - hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); + module:fire_event("stream-features", { origin = session, features = features }); session.send(features); session.notopen = nil; end end local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end + +function runner_callbacks:error(err) -- luacheck: ignore 212/self + return handleerr(err); +end + function stream_callbacks.handlestanza(context, stanza) if context.ignore then return; end log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); @@ -417,9 +446,7 @@ function stream_callbacks.handlestanza(context, stanza) t_insert(session.bosh_deferred, stanza); else stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + session.thread:run(stanza); end else log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!"); @@ -432,13 +459,13 @@ function stream_callbacks.streamclosed(context) if not context.defer and session.bosh_deferred then -- Handle deferred stanzas now local deferred_stanzas = session.bosh_deferred; - local context = deferred_stanzas.context; + local deferred_context = deferred_stanzas.context; session.bosh_deferred = nil; log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid); session.rid = deferred_stanzas.rid; - t_insert(session.requests, context.response); + t_insert(session.requests, deferred_context.response); for _, stanza in ipairs(deferred_stanzas) do - stream_callbacks.handlestanza(context, stanza); + stream_callbacks.handlestanza(deferred_context, stanza); end if deferred_stanzas.terminate then session.bosh_terminate = true; @@ -469,65 +496,25 @@ function stream_callbacks.error(context, error) end end -local dead_sessions = module:shared("dead_sessions"); -function on_timer() - -- log("debug", "Checking for requests soon to timeout..."); - -- Identify requests timing out within the next few seconds - local now = os_time() + 3; - for request, reply_before in pairs(waiting_requests) do - if reply_before <= now then - log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now); - -- Send empty response to let the - -- client know we're still here - if request.conn then - sessions[request.context.sid].send(""); - end - end - end - - now = now - 3; - local n_dead_sessions = 0; - for session, close_after in pairs(inactive_sessions) do - if close_after < now then - (session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now); - sessions[session.sid] = nil; - inactive_sessions[session] = nil; - n_dead_sessions = n_dead_sessions + 1; - dead_sessions[n_dead_sessions] = session; - end - end - - for i=1,n_dead_sessions do - local session = dead_sessions[i]; - dead_sessions[i] = nil; - sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); - end - return 1; -end -module:add_timer(1, on_timer); - - local GET_response = { headers = { content_type = "text/html"; }; body = [[<html><body> <p>It works! Now point your BOSH client to this URL to connect to Prosody.</p> - <p>For more information see <a href="http://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p> + <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p> </body></html>]]; }; -function module.add_host(module) - module:depends("http"); - module:provides("http", { - default_path = "/http-bind"; - route = { - ["GET"] = GET_response; - ["GET /"] = GET_response; - ["OPTIONS"] = handle_OPTIONS; - ["OPTIONS /"] = handle_OPTIONS; - ["POST"] = handle_POST; - ["POST /"] = handle_POST; - }; - }); -end +module:depends("http"); +module:provides("http", { + default_path = "/http-bind"; + route = { + ["GET"] = GET_response; + ["GET /"] = GET_response; + ["OPTIONS"] = handle_OPTIONS; + ["OPTIONS /"] = handle_OPTIONS; + ["POST"] = handle_POST; + ["POST /"] = handle_POST; + }; +}); |