aboutsummaryrefslogtreecommitdiffstats
path: root/core/moduleapi.lua
diff options
context:
space:
mode:
Diffstat (limited to 'core/moduleapi.lua')
-rw-r--r--core/moduleapi.lua203
1 files changed, 186 insertions, 17 deletions
diff --git a/core/moduleapi.lua b/core/moduleapi.lua
index 10f9f04d..6a91a045 100644
--- a/core/moduleapi.lua
+++ b/core/moduleapi.lua
@@ -14,13 +14,20 @@ local pluginloader = require "util.pluginloader";
local timer = require "util.timer";
local resolve_relative_path = require"util.paths".resolve_relative_path;
local st = require "util.stanza";
+local cache = require "util.cache";
+local errors = require "util.error";
+local promise = require "util.promise";
+local time_now = require "util.time".now;
+local format = require "util.format".format;
+local jid_node = require "util.jid".node;
+local jid_resource = require "util.jid".resource;
local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
local error, setmetatable, type = error, setmetatable, type;
local ipairs, pairs, select = ipairs, pairs, select;
local tonumber, tostring = tonumber, tostring;
local require = require;
-local pack = table.pack or function(...) return {n=select("#",...), ...}; end -- table.pack is only in 5.2
+local pack = table.pack or require "util.table".pack; -- table.pack is only in 5.2
local unpack = table.unpack or unpack; --luacheck: ignore 113 -- renamed in 5.2
local prosody = prosody;
@@ -97,7 +104,7 @@ function api:hook_tag(xmlns, name, handler, priority)
-- If only 2 options then they specified no xmlns
xmlns, name, handler, priority = nil, xmlns, name, handler;
elseif not (handler and name) then
- self:log("warn", "Error: Insufficient parameters to module:hook_stanza()");
+ self:log("warn", "Error: Insufficient parameters to module:hook_tag()");
return;
end
return self:hook("stanza/"..(xmlns and (xmlns..":") or "")..name,
@@ -361,6 +368,111 @@ function api:send(stanza, origin)
return core_post_stanza(origin or hosts[self.host], stanza);
end
+function api:send_iq(stanza, origin, timeout)
+ local iq_cache = self._iq_cache;
+ if not iq_cache then
+ iq_cache = cache.new(256, function (_, iq)
+ iq.reject(errors.new({
+ type = "wait", condition = "resource-constraint",
+ text = "evicted from iq tracking cache"
+ }));
+ end);
+ self._iq_cache = iq_cache;
+ end
+
+ local event_type;
+ if not jid_node(stanza.attr.from) then
+ event_type = "host";
+ elseif jid_resource(stanza.attr.from) then
+ event_type = "full";
+ else -- assume bare since we can't hook full jids
+ event_type = "bare";
+ end
+ local result_event = "iq-result/"..event_type.."/"..stanza.attr.id;
+ local error_event = "iq-error/"..event_type.."/"..stanza.attr.id;
+ local cache_key = event_type.."/"..stanza.attr.id;
+ if event_type == "full" then
+ result_event = "iq/" .. event_type;
+ error_event = "iq/" .. event_type;
+ end
+
+ local p = promise.new(function (resolve, reject)
+ local function result_handler(event)
+ local response = event.stanza;
+ if response.attr.type == "result" and response.attr.from == stanza.attr.to and response.attr.id == stanza.attr.id then
+ resolve(event);
+ return true;
+ end
+ end
+
+ local function error_handler(event)
+ local response = event.stanza;
+ if response.attr.type == "error" and response.attr.from == stanza.attr.to and response.attr.id == stanza.attr.id then
+ reject(errors.from_stanza(event.stanza, event));
+ return true;
+ end
+ end
+
+ if iq_cache:get(cache_key) then
+ reject(errors.new({
+ type = "modify", condition = "conflict",
+ text = "IQ stanza id attribute already used",
+ }));
+ return;
+ end
+
+ self:hook(result_event, result_handler, 1);
+ self:hook(error_event, error_handler, 1);
+
+ local timeout_handle = self:add_timer(timeout or 120, function ()
+ reject(errors.new({
+ type = "wait", condition = "remote-server-timeout",
+ text = "IQ stanza timed out",
+ }));
+ end);
+
+ local ok = iq_cache:set(cache_key, {
+ reject = reject, resolve = resolve,
+ timeout_handle = timeout_handle,
+ result_handler = result_handler, error_handler = error_handler;
+ });
+
+ if not ok then
+ reject(errors.new({
+ type = "wait", condition = "internal-server-error",
+ text = "Could not store IQ tracking data"
+ }));
+ return;
+ end
+
+ local wrapped_origin = setmetatable({
+ -- XXX Needed in some cases for replies to work correctly when sending queries internally.
+ send = function (reply)
+ if reply.name == stanza.name and reply.attr.id == stanza.attr.id then
+ resolve({ stanza = reply });
+ end
+ return (origin or hosts[self.host]).send(reply)
+ end;
+ }, {
+ __index = origin or hosts[self.host];
+ });
+
+ self:send(stanza, wrapped_origin);
+ end);
+
+ p:finally(function ()
+ local iq = iq_cache:get(cache_key);
+ if iq then
+ self:unhook(result_event, iq.result_handler);
+ self:unhook(error_event, iq.error_handler);
+ iq.timeout_handle:stop();
+ iq_cache:set(cache_key, nil);
+ end
+ end);
+
+ return p;
+end
+
function api:broadcast(jids, stanza, iter)
for jid in (iter or it.values)(jids) do
local new_stanza = st.clone(stanza);
@@ -394,9 +506,29 @@ function api:add_timer(delay, callback, ...)
return setmetatable(t, timer_mt);
end
+function api:cron(task_spec)
+ self:depends("cron");
+ self:add_item("task", task_spec);
+end
+
+function api:hourly(name, fun)
+ if type(name) == "function" then fun, name = name, nil; end
+ self:cron({ name = name; when = "hourly"; run = fun });
+end
+
+function api:daily(name, fun)
+ if type(name) == "function" then fun, name = name, nil; end
+ self:cron({ name = name; when = "daily"; run = fun });
+end
+
+function api:weekly(name, fun)
+ if type(name) == "function" then fun, name = name, nil; end
+ self:cron({ name = name; when = "weekly"; run = fun });
+end
+
local path_sep = package.config:sub(1,1);
function api:get_directory()
- return self.path and (self.path:gsub("%"..path_sep.."[^"..path_sep.."]*$", "")) or nil;
+ return self.resource_path or self.path and (self.path:gsub("%"..path_sep.."[^"..path_sep.."]*$", "")) or nil;
end
function api:load_resource(path, mode)
@@ -408,28 +540,65 @@ function api:open_store(name, store_type)
return require"core.storagemanager".open(self.host, name or self.name, store_type);
end
-function api:measure(name, stat_type)
+function api:measure(name, stat_type, conf)
local measure = require "core.statsmanager".measure;
- return measure(stat_type, "/"..self.host.."/mod_"..self.name.."/"..name);
+ local fixed_label_key, fixed_label_value
+ if self.host ~= "*" then
+ fixed_label_key = "host"
+ fixed_label_value = self.host
+ end
+ -- new_legacy_metric takes care of scoping for us, as it does not accept
+ -- an array of labels
+ -- the prosody_ prefix is automatically added by statsmanager for legacy
+ -- metrics.
+ self:add_item("measure", { name = name, type = stat_type, conf = conf });
+ return measure(stat_type, "mod_"..self.name.."/"..name, conf, fixed_label_key, fixed_label_value)
+end
+
+function api:metric(type_, name, unit, description, label_keys, conf)
+ local metric = require "core.statsmanager".metric;
+ local is_scoped = self.host ~= "*"
+ if is_scoped then
+ -- prepend `host` label to label keys if this is not a global module
+ local orig_labels = label_keys
+ label_keys = array { "host" }
+ label_keys:append(orig_labels)
+ end
+ local mf = metric(type_, "prosody_mod_"..self.name.."/"..name, unit, description, label_keys, conf)
+ self:add_item("metric", { name = name, mf = mf });
+ if is_scoped then
+ -- make sure to scope the returned metric family to the current host
+ return mf:with_partial_label(self.host)
+ end
+ return mf
end
-function api:measure_object_event(events_object, event_name, stat_name)
- local m = self:measure(stat_name or event_name, "times");
- local function handler(handlers, _event_name, _event_data)
- local finished = m();
- local ret = handlers(_event_name, _event_data);
- finished();
- return ret;
+local status_priorities = { error = 3, warn = 2, info = 1, core = 0 };
+
+function api:set_status(status_type, status_message, override)
+ local priority = status_priorities[status_type];
+ if not priority then
+ self:log("error", "set_status: Invalid status type '%s', assuming 'info'");
+ status_type, priority = "info", status_priorities.info;
+ end
+ local current_priority = status_priorities[self.status_type] or 0;
+ -- By default an 'error' status can only be overwritten by another 'error' status
+ if (current_priority >= status_priorities.error and priority < current_priority and override ~= true)
+ or (override == false and current_priority > priority) then
+ self:log("debug", "moduleapi: ignoring status [prio %d override %s]: %s", priority, override, status_message);
+ return;
end
- return self:hook_object_event(events_object, event_name, handler);
+ self.status_type, self.status_message, self.status_time = status_type, status_message, time_now();
+ self:fire_event("module-status/updated", { name = self.name });
end
-function api:measure_event(event_name, stat_name)
- return self:measure_object_event((hosts[self.host] or prosody).events.wrappers, event_name, stat_name);
+function api:log_status(level, msg, ...)
+ self:set_status(level, format(msg, ...));
+ return self:log(level, msg, ...);
end
-function api:measure_global_event(event_name, stat_name)
- return self:measure_object_event(prosody.events.wrappers, event_name, stat_name);
+function api:get_status()
+ return self.status_type, self.status_message, self.status_time;
end
return api;