diff options
-rw-r--r-- | core/certmanager.lua | 2 | ||||
-rw-r--r-- | core/hostmanager.lua | 1 | ||||
-rw-r--r-- | core/moduleapi.lua | 5 | ||||
-rw-r--r-- | core/portmanager.lua | 2 | ||||
-rw-r--r-- | core/rostermanager.lua | 2 | ||||
-rw-r--r-- | core/sessionmanager.lua | 4 | ||||
-rw-r--r-- | core/statsmanager.lua | 67 | ||||
-rw-r--r-- | core/storagemanager.lua | 2 | ||||
-rw-r--r-- | plugins/mod_carbons.lua | 111 | ||||
-rw-r--r-- | plugins/mod_pep.lua | 10 | ||||
-rwxr-xr-x | prosody | 1 | ||||
-rw-r--r-- | util/hex.lua | 20 | ||||
-rw-r--r-- | util/statistics.lua | 160 |
13 files changed, 370 insertions, 17 deletions
diff --git a/core/certmanager.lua b/core/certmanager.lua index 7ad7b034..b2c358fe 100644 --- a/core/certmanager.lua +++ b/core/certmanager.lua @@ -8,7 +8,7 @@ local configmanager = require "core.configmanager"; local log = require "util.logger".init("certmanager"); -local ssl = ssl; +local ssl = _G.ssl; local ssl_newcontext = ssl and ssl.newcontext; local new_config = require"util.sslconfig".new; diff --git a/core/hostmanager.lua b/core/hostmanager.lua index ca532625..b13b1944 100644 --- a/core/hostmanager.lua +++ b/core/hostmanager.lua @@ -13,7 +13,6 @@ local disco_items = require "util.multitable".new(); local NULL = {}; local jid_split = require "util.jid".split; -local uuid_gen = require "util.uuid".generate; local log = require "util.logger".init("hostmanager"); diff --git a/core/moduleapi.lua b/core/moduleapi.lua index 754d7497..d6aa0ef0 100644 --- a/core/moduleapi.lua +++ b/core/moduleapi.lua @@ -14,6 +14,7 @@ local logger = require "util.logger"; local pluginloader = require "util.pluginloader"; local timer = require "util.timer"; local resolve_relative_path = require"util.paths".resolve_relative_path; +local measure = require "core.statsmanager".measure; local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; local error, setmetatable, type = error, setmetatable, type; @@ -390,6 +391,10 @@ function api:open_store(name, type) return require"core.storagemanager".open(self.host, name or self.name, type); end +function api:measure(name, type) + return measure(type, "/"..self.host.."/mod_"..self.name.."/"..name); +end + function api.init(mm) modulemanager = mm; return api; diff --git a/core/portmanager.lua b/core/portmanager.lua index bc2d4264..eab2412a 100644 --- a/core/portmanager.lua +++ b/core/portmanager.lua @@ -9,7 +9,7 @@ local set = require "util.set"; local table = table; local setmetatable, rawset, rawget = setmetatable, rawset, rawget; -local type, tonumber, tostring, ipairs, pairs = type, tonumber, tostring, ipairs, pairs; +local type, tonumber, tostring, ipairs = type, tonumber, tostring, ipairs; local prosody = prosody; local fire_event = prosody.events.fire_event; diff --git a/core/rostermanager.lua b/core/rostermanager.lua index 5266afb5..8c7612b4 100644 --- a/core/rostermanager.lua +++ b/core/rostermanager.lua @@ -15,7 +15,7 @@ local pairs = pairs; local tostring = tostring; local hosts = hosts; -local bare_sessions = bare_sessions; +local bare_sessions = prosody.bare_sessions; local datamanager = require "util.datamanager" local um_user_exists = require "core.usermanager".user_exists; diff --git a/core/sessionmanager.lua b/core/sessionmanager.lua index 65e5156c..09920b7d 100644 --- a/core/sessionmanager.lua +++ b/core/sessionmanager.lua @@ -10,8 +10,8 @@ local tostring, setmetatable = tostring, setmetatable; local pairs, next= pairs, next; local hosts = hosts; -local full_sessions = full_sessions; -local bare_sessions = bare_sessions; +local full_sessions = prosody.full_sessions; +local bare_sessions = prosody.bare_sessions; local logger = require "util.logger"; local log = logger.init("sessionmanager"); diff --git a/core/statsmanager.lua b/core/statsmanager.lua new file mode 100644 index 00000000..62d217ef --- /dev/null +++ b/core/statsmanager.lua @@ -0,0 +1,67 @@ + +local stats = require "util.statistics".new(); +local config = require "core.configmanager"; +local log = require "util.logger".init("stats"); +local timer = require "util.timer"; +local fire_event = prosody.events.fire_event; + +local stats_config = config.get("*", "statistics_interval"); +local stats_interval = tonumber(stats_config); +if stats_config and not stats_interval then + log("error", "Invalid 'statistics_interval' setting, statistics will be disabled"); +end + +local measure, collect; +local latest_stats = {}; +local changed_stats = {}; +local stats_extra = {}; + +if stats_interval then + log("debug", "Statistics collection is enabled every %d seconds", stats_interval); + function measure(type, name) + local f = assert(stats[type], "unknown stat type: "..type); + return f(name); + end + + local mark_collection_start = measure("times", "stats.collection"); + local mark_processing_start = measure("times", "stats.processing"); + + function collect() + local mark_collection_done = mark_collection_start(); + changed_stats, stats_extra = {}, {}; + for stat_name, getter in pairs(stats.get_stats()) do + local type, value, extra = getter(); + local old_value = latest_stats[stat_name]; + latest_stats[stat_name] = value; + if value ~= old_value then + changed_stats[stat_name] = value; + end + if extra then + stats_extra[stat_name] = extra; + end + end + mark_collection_done(); + local mark_processing_done = mark_processing_start(); + fire_event("stats-updated", { stats = latest_stats, changed_stats = changed_stats, stats_extra = stats_extra }); + mark_processing_done(); + return stats_interval; + end + + timer.add_task(stats_interval, collect); +else + log("debug", "Statistics collection is disabled"); + -- nop + function measure() + return measure; + end + function collect() + end +end + +return { + measure = measure; + collect = collect; + get_stats = function () + return latest_stats, changed_stats, stats_extra; + end; +}; diff --git a/core/storagemanager.lua b/core/storagemanager.lua index b2ad29d0..d16bdce5 100644 --- a/core/storagemanager.lua +++ b/core/storagemanager.lua @@ -1,5 +1,5 @@ -local error, type, pairs = error, type, pairs; +local type, pairs = type, pairs; local setmetatable = setmetatable; local config = require "core.configmanager"; diff --git a/plugins/mod_carbons.lua b/plugins/mod_carbons.lua new file mode 100644 index 00000000..51242809 --- /dev/null +++ b/plugins/mod_carbons.lua @@ -0,0 +1,111 @@ +-- XEP-0280: Message Carbons implementation for Prosody +-- Copyright (C) 2011 Kim Alvefur +-- +-- This file is MIT/X11 licensed. + +local st = require "util.stanza"; +local jid_bare = require "util.jid".bare; +local xmlns_carbons = "urn:xmpp:carbons:2"; +local xmlns_forward = "urn:xmpp:forward:0"; +local full_sessions, bare_sessions = full_sessions, bare_sessions; + +local function toggle_carbons(event) + local origin, stanza = event.origin, event.stanza; + local state = stanza.tags[1].name; + module:log("debug", "%s %sd carbons", origin.full_jid, state); + origin.want_carbons = state == "enable" and stanza.tags[1].attr.xmlns; + return origin.send(st.reply(stanza)); +end +module:hook("iq-set/self/"..xmlns_carbons..":disable", toggle_carbons); +module:hook("iq-set/self/"..xmlns_carbons..":enable", toggle_carbons); + +local function message_handler(event, c2s) + local origin, stanza = event.origin, event.stanza; + local orig_type = stanza.attr.type; + local orig_from = stanza.attr.from; + local orig_to = stanza.attr.to; + + if not (orig_type == nil + or orig_type == "normal" + or orig_type == "chat") then + return -- No carbons for messages of type error or headline + end + + -- Stanza sent by a local client + local bare_jid = jid_bare(orig_from); + local target_session = origin; + local top_priority = false; + local user_sessions = bare_sessions[bare_jid]; + + -- Stanza about to be delivered to a local client + if not c2s then + bare_jid = jid_bare(orig_to); + target_session = full_sessions[orig_to]; + user_sessions = bare_sessions[bare_jid]; + if not target_session and user_sessions then + -- The top resources will already receive this message per normal routing rules, + -- so we are going to skip them in order to avoid sending duplicated messages. + local top_resources = user_sessions.top_resources; + top_priority = top_resources and top_resources[1].priority + end + end + + if not user_sessions then + module:log("debug", "Skip carbons for offline user"); + return -- No use in sending carbons to an offline user + end + + if stanza:get_child("private", xmlns_carbons) then + if not c2s then + stanza:maptags(function(tag) + if not ( tag.attr.xmlns == xmlns_carbons and tag.name == "private" ) then + return tag; + end + end); + end + module:log("debug", "Message tagged private, ignoring"); + return + elseif stanza:get_child("no-copy", "urn:xmpp:hints") then + module:log("debug", "Message has no-copy hint, ignoring"); + return + elseif stanza:get_child("x", "http://jabber.org/protocol/muc#user") then + module:log("debug", "MUC PM, ignoring"); + return + end + + -- Create the carbon copy and wrap it as per the Stanza Forwarding XEP + local copy = st.clone(stanza); + copy.attr.xmlns = "jabber:client"; + local carbon = st.message{ from = bare_jid, type = orig_type, } + :tag(c2s and "sent" or "received", { xmlns = xmlns_carbons }) + :tag("forwarded", { xmlns = xmlns_forward }) + :add_child(copy):reset(); + + user_sessions = user_sessions and user_sessions.sessions; + for _, session in pairs(user_sessions) do + -- Carbons are sent to resources that have enabled it + if session.want_carbons + -- but not the resource that sent the message, or the one that it's directed to + and session ~= target_session + -- and isn't among the top resources that would receive the message per standard routing rules + and (c2s or session.priority ~= top_priority) then + carbon.attr.to = session.full_jid; + module:log("debug", "Sending carbon to %s", session.full_jid); + session.send(carbon); + end + end +end + +local function c2s_message_handler(event) + return message_handler(event, true) +end + +-- Stanzas sent by local clients +module:hook("pre-message/host", c2s_message_handler, 1); +module:hook("pre-message/bare", c2s_message_handler, 1); +module:hook("pre-message/full", c2s_message_handler, 1); +-- Stanzas to local clients +module:hook("message/bare", message_handler, 1); +module:hook("message/full", message_handler, 1); + +module:add_feature(xmlns_carbons); diff --git a/plugins/mod_pep.lua b/plugins/mod_pep.lua index 752cd28c..a6916d1f 100644 --- a/plugins/mod_pep.lua +++ b/plugins/mod_pep.lua @@ -41,7 +41,8 @@ local function subscription_presence(user_bare, recipient) return is_contact_subscribed(username, host, recipient_bare); end -local function publish(session, node, id, item) +module:hook("pep-publish-item", function (event) + local session, node, id, item = event.session, event.node, event.id, event.item; item.attr.xmlns = nil; local disable = #item.tags ~= 1 or #item.tags[1] == 0; if #item.tags == 0 then item.name = "retract"; end @@ -72,7 +73,8 @@ local function publish(session, node, id, item) core_post_stanza(session, stanza); end end -end +end); + local function publish_all(user, recipient, session) local d = data[user]; local notify = recipients[user] and recipients[user][recipient]; @@ -172,7 +174,9 @@ module:hook("iq/bare/http://jabber.org/protocol/pubsub:pubsub", function(event) local id = payload.attr.id or "1"; payload.attr.id = id; session.send(st.reply(stanza)); - publish(session, node, id, st.clone(payload)); + module:fire_event("pep-publish-item", { + node = node, actor = session.jid, id = id, session = session, item = st.clone(payload); + }); return true; end end @@ -292,6 +292,7 @@ function load_secondary_libraries() require "util.import" require "util.xmppstream" require "core.stanza_router" + require "core.statsmanager" require "core.hostmanager" require "core.portmanager" require "core.modulemanager" diff --git a/util/hex.lua b/util/hex.lua index b21ee17e..e41f4863 100644 --- a/util/hex.lua +++ b/util/hex.lua @@ -1,19 +1,25 @@ local s_char = string.char; +local s_format = string.format; +local s_gsub = string.gsub; -local function char_to_hex(c) - return ("%02x"):format(c:byte()) -end +local char_to_hex = {}; +local hex_to_char = {}; -local function hex_to_char(h) - return s_char(tonumber(h, 16)); +do + local char, hex; + for i = 0,255 do + char, hex = s_char(i), s_format("%02x", i); + char_to_hex[char] = hex; + hex_to_char[hex] = char; + end end local function to(s) - return s:gsub(".", char_to_hex); + return (s_gsub(s, ".", char_to_hex)); end local function from(s) - return s:gsub("..", hex_to_char); + return (s_gsub(s, "..", hex_to_char)); end return { to = to, from = from } diff --git a/util/statistics.lua b/util/statistics.lua new file mode 100644 index 00000000..08c765ae --- /dev/null +++ b/util/statistics.lua @@ -0,0 +1,160 @@ +local t_sort = table.sort +local m_floor = math.floor; +local time = require "socket".gettime; + +local function nop_function() end + +local function percentile(arr, length, pc) + local n = pc/100 * (length + 1); + local k, d = m_floor(n), n%1; + if k == 0 then + return arr[1]; + elseif k >= length then + return arr[length]; + end + return arr[k] + d*(arr[k+1] - arr[k]); +end + +local function new_registry(config) + config = config or {}; + local duration_sample_interval = config.duration_sample_interval or 5; + local duration_max_samples = config.duration_max_stored_samples or 5000; + + local function get_distribution_stats(events, n_actual_events, since, new_time, units) + local n_stored_events = #events; + t_sort(events); + local sum = 0; + for i = 1, n_stored_events do + sum = sum + events[i]; + end + + return { + samples = events; + sample_count = n_stored_events; + count = n_actual_events, + rate = n_actual_events/(new_time-since); + average = n_stored_events > 0 and sum/n_stored_events or 0, + min = events[1] or 0, + max = events[n_stored_events] or 0, + units = units, + }; + end + + + local registry = {}; + local methods; + methods = { + amount = function (name, initial) + local v = initial or 0; + registry[name..":amount"] = function () return "amount", v; end + return function (new_v) v = new_v; end + end; + counter = function (name, initial) + local v = initial or 0; + registry[name..":amount"] = function () return "amount", v; end + return function (delta) + v = v + delta; + end; + end; + rate = function (name) + local since, n = time(), 0; + registry[name..":rate"] = function () + local t = time(); + local stats = { + rate = n/(t-since); + count = n; + }; + since, n = t, 0; + return "rate", stats.rate, stats; + end; + return function () + n = n + 1; + end; + end; + distribution = function (name, unit, type) + type = type or "distribution"; + local events, last_event = {}, 0; + local n_actual_events = 0; + local since = time(); + + registry[name..":"..type] = function () + local new_time = time(); + local stats = get_distribution_stats(events, n_actual_events, since, new_time, unit); + events, last_event = {}, 0; + n_actual_events = 0; + since = new_time; + return type, stats.average, stats; + end; + + return function (value) + n_actual_events = n_actual_events + 1; + if n_actual_events%duration_sample_interval > 0 then + last_event = (last_event%duration_max_samples) + 1; + events[last_event] = value; + end + end; + end; + sizes = function (name) + return methods.distribution(name, "bytes", "size"); + end; + times = function (name) + local events, last_event = {}, 0; + local n_actual_events = 0; + local since = time(); + + registry[name..":duration"] = function () + local new_time = time(); + local stats = get_distribution_stats(events, n_actual_events, since, new_time, "seconds"); + events, last_event = {}, 0; + n_actual_events = 0; + since = new_time; + return "duration", stats.average, stats; + end; + + return function () + n_actual_events = n_actual_events + 1; + if n_actual_events%duration_sample_interval > 0 then + return nop_function; + end + + local start_time = time(); + return function () + local end_time = time(); + local duration = end_time - start_time; + last_event = (last_event%duration_max_samples) + 1; + events[last_event] = duration; + end + end; + end; + + get_stats = function () + return registry; + end; + }; + return methods; +end + +return { + new = new_registry; + get_histogram = function (duration, n_buckets) + n_buckets = n_buckets or 100; + local events, n_events = duration.samples, duration.sample_count; + if not (events and n_events) then + return nil, "not a valid distribution stat"; + end + local histogram = {}; + + for i = 1, 100, 100/n_buckets do + histogram[i] = percentile(events, n_events, i); + end + return histogram; + end; + + get_percentile = function (duration, pc) + local events, n_events = duration.samples, duration.sample_count; + if not (events and n_events) then + return nil, "not a valid distribution stat"; + end + return percentile(events, n_events, pc); + end; +} |