diff options
Diffstat (limited to 'core/moduleapi.lua')
-rw-r--r-- | core/moduleapi.lua | 205 |
1 files changed, 187 insertions, 18 deletions
diff --git a/core/moduleapi.lua b/core/moduleapi.lua index 10f9f04d..24d99c69 100644 --- a/core/moduleapi.lua +++ b/core/moduleapi.lua @@ -14,13 +14,20 @@ local pluginloader = require "util.pluginloader"; local timer = require "util.timer"; local resolve_relative_path = require"util.paths".resolve_relative_path; local st = require "util.stanza"; +local cache = require "util.cache"; +local errors = require "util.error"; +local promise = require "util.promise"; +local time_now = require "util.time".now; +local format = require "util.format".format; +local jid_node = require "util.jid".node; +local jid_resource = require "util.jid".resource; local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; local error, setmetatable, type = error, setmetatable, type; local ipairs, pairs, select = ipairs, pairs, select; local tonumber, tostring = tonumber, tostring; local require = require; -local pack = table.pack or function(...) return {n=select("#",...), ...}; end -- table.pack is only in 5.2 +local pack = table.pack or require "util.table".pack; -- table.pack is only in 5.2 local unpack = table.unpack or unpack; --luacheck: ignore 113 -- renamed in 5.2 local prosody = prosody; @@ -97,7 +104,7 @@ function api:hook_tag(xmlns, name, handler, priority) -- If only 2 options then they specified no xmlns xmlns, name, handler, priority = nil, xmlns, name, handler; elseif not (handler and name) then - self:log("warn", "Error: Insufficient parameters to module:hook_stanza()"); + self:log("warn", "Error: Insufficient parameters to module:hook_tag()"); return; end return self:hook("stanza/"..(xmlns and (xmlns..":") or "")..name, @@ -348,7 +355,7 @@ function api:provides(name, item) local item_name = self.name; -- Strip a provider prefix to find the item name -- (e.g. "auth_foo" -> "foo" for an auth provider) - if item_name:find(name.."_", 1, true) == 1 then + if item_name:find((name:gsub("%-", "_")).."_", 1, true) == 1 then item_name = item_name:sub(#name+2); end item.name = item_name; @@ -361,6 +368,111 @@ function api:send(stanza, origin) return core_post_stanza(origin or hosts[self.host], stanza); end +function api:send_iq(stanza, origin, timeout) + local iq_cache = self._iq_cache; + if not iq_cache then + iq_cache = cache.new(256, function (_, iq) + iq.reject(errors.new({ + type = "wait", condition = "resource-constraint", + text = "evicted from iq tracking cache" + })); + end); + self._iq_cache = iq_cache; + end + + local event_type; + if not jid_node(stanza.attr.from) then + event_type = "host"; + elseif jid_resource(stanza.attr.from) then + event_type = "full"; + else -- assume bare since we can't hook full jids + event_type = "bare"; + end + local result_event = "iq-result/"..event_type.."/"..stanza.attr.id; + local error_event = "iq-error/"..event_type.."/"..stanza.attr.id; + local cache_key = event_type.."/"..stanza.attr.id; + if event_type == "full" then + result_event = "iq/" .. event_type; + error_event = "iq/" .. event_type; + end + + local p = promise.new(function (resolve, reject) + local function result_handler(event) + local response = event.stanza; + if response.attr.type == "result" and response.attr.from == stanza.attr.to and response.attr.id == stanza.attr.id then + resolve(event); + return true; + end + end + + local function error_handler(event) + local response = event.stanza; + if response.attr.type == "error" and response.attr.from == stanza.attr.to and response.attr.id == stanza.attr.id then + reject(errors.from_stanza(event.stanza, event)); + return true; + end + end + + if iq_cache:get(cache_key) then + reject(errors.new({ + type = "modify", condition = "conflict", + text = "IQ stanza id attribute already used", + })); + return; + end + + self:hook(result_event, result_handler, 1); + self:hook(error_event, error_handler, 1); + + local timeout_handle = self:add_timer(timeout or 120, function () + reject(errors.new({ + type = "wait", condition = "remote-server-timeout", + text = "IQ stanza timed out", + })); + end); + + local ok = iq_cache:set(cache_key, { + reject = reject, resolve = resolve, + timeout_handle = timeout_handle, + result_handler = result_handler, error_handler = error_handler; + }); + + if not ok then + reject(errors.new({ + type = "wait", condition = "internal-server-error", + text = "Could not store IQ tracking data" + })); + return; + end + + local wrapped_origin = setmetatable({ + -- XXX Needed in some cases for replies to work correctly when sending queries internally. + send = function (reply) + if reply.name == stanza.name and reply.attr.id == stanza.attr.id then + resolve({ stanza = reply }); + end + return (origin or hosts[self.host]).send(reply) + end; + }, { + __index = origin or hosts[self.host]; + }); + + self:send(stanza, wrapped_origin); + end); + + p:finally(function () + local iq = iq_cache:get(cache_key); + if iq then + self:unhook(result_event, iq.result_handler); + self:unhook(error_event, iq.error_handler); + iq.timeout_handle:stop(); + iq_cache:set(cache_key, nil); + end + end); + + return p; +end + function api:broadcast(jids, stanza, iter) for jid in (iter or it.values)(jids) do local new_stanza = st.clone(stanza); @@ -394,9 +506,29 @@ function api:add_timer(delay, callback, ...) return setmetatable(t, timer_mt); end +function api:cron(task_spec) + self:depends("cron"); + self:add_item("task", task_spec); +end + +function api:hourly(name, fun) + if type(name) == "function" then fun, name = name, nil; end + self:cron({ name = name; when = "hourly"; run = fun }); +end + +function api:daily(name, fun) + if type(name) == "function" then fun, name = name, nil; end + self:cron({ name = name; when = "daily"; run = fun }); +end + +function api:weekly(name, fun) + if type(name) == "function" then fun, name = name, nil; end + self:cron({ name = name; when = "weekly"; run = fun }); +end + local path_sep = package.config:sub(1,1); function api:get_directory() - return self.path and (self.path:gsub("%"..path_sep.."[^"..path_sep.."]*$", "")) or nil; + return self.resource_path or self.path and (self.path:gsub("%"..path_sep.."[^"..path_sep.."]*$", "")) or nil; end function api:load_resource(path, mode) @@ -408,28 +540,65 @@ function api:open_store(name, store_type) return require"core.storagemanager".open(self.host, name or self.name, store_type); end -function api:measure(name, stat_type) +function api:measure(name, stat_type, conf) local measure = require "core.statsmanager".measure; - return measure(stat_type, "/"..self.host.."/mod_"..self.name.."/"..name); + local fixed_label_key, fixed_label_value + if self.host ~= "*" then + fixed_label_key = "host" + fixed_label_value = self.host + end + -- new_legacy_metric takes care of scoping for us, as it does not accept + -- an array of labels + -- the prosody_ prefix is automatically added by statsmanager for legacy + -- metrics. + self:add_item("measure", { name = name, type = stat_type, conf = conf }); + return measure(stat_type, "mod_"..self.name.."/"..name, conf, fixed_label_key, fixed_label_value) +end + +function api:metric(type_, name, unit, description, label_keys, conf) + local metric = require "core.statsmanager".metric; + local is_scoped = self.host ~= "*" + if is_scoped then + -- prepend `host` label to label keys if this is not a global module + local orig_labels = label_keys + label_keys = array { "host" } + label_keys:append(orig_labels) + end + local mf = metric(type_, "prosody_mod_"..self.name.."/"..name, unit, description, label_keys, conf) + self:add_item("metric", { name = name, mf = mf }); + if is_scoped then + -- make sure to scope the returned metric family to the current host + return mf:with_partial_label(self.host) + end + return mf end -function api:measure_object_event(events_object, event_name, stat_name) - local m = self:measure(stat_name or event_name, "times"); - local function handler(handlers, _event_name, _event_data) - local finished = m(); - local ret = handlers(_event_name, _event_data); - finished(); - return ret; +local status_priorities = { error = 3, warn = 2, info = 1, core = 0 }; + +function api:set_status(status_type, status_message, override) + local priority = status_priorities[status_type]; + if not priority then + self:log("error", "set_status: Invalid status type '%s', assuming 'info'"); + status_type, priority = "info", status_priorities.info; + end + local current_priority = status_priorities[self.status_type] or 0; + -- By default an 'error' status can only be overwritten by another 'error' status + if (current_priority >= status_priorities.error and priority < current_priority and override ~= true) + or (override == false and current_priority > priority) then + self:log("debug", "moduleapi: ignoring status [prio %d override %s]: %s", priority, override, status_message); + return; end - return self:hook_object_event(events_object, event_name, handler); + self.status_type, self.status_message, self.status_time = status_type, status_message, time_now(); + self:fire_event("module-status/updated", { name = self.name }); end -function api:measure_event(event_name, stat_name) - return self:measure_object_event((hosts[self.host] or prosody).events.wrappers, event_name, stat_name); +function api:log_status(level, msg, ...) + self:set_status(level, format(msg, ...)); + return self:log(level, msg, ...); end -function api:measure_global_event(event_name, stat_name) - return self:measure_object_event(prosody.events.wrappers, event_name, stat_name); +function api:get_status() + return self.status_type, self.status_message, self.status_time; end return api; |