aboutsummaryrefslogtreecommitdiffstats
path: root/util/pubsub.lua
diff options
context:
space:
mode:
Diffstat (limited to 'util/pubsub.lua')
-rw-r--r--util/pubsub.lua117
1 files changed, 85 insertions, 32 deletions
diff --git a/util/pubsub.lua b/util/pubsub.lua
index 7ccc817f..acb34db9 100644
--- a/util/pubsub.lua
+++ b/util/pubsub.lua
@@ -1,9 +1,11 @@
local events = require "util.events";
local cache = require "util.cache";
+local errors = require "util.error";
local service_mt = {};
local default_config = {
+ max_items = 256;
itemstore = function (config, _) return cache.new(config["max_items"]) end;
broadcaster = function () end;
subscriber_filter = function (subs) return subs end;
@@ -131,10 +133,11 @@ local default_config = {
local default_config_mt = { __index = default_config };
local default_node_config = {
- ["persist_items"] = false;
+ ["persist_items"] = true;
["max_items"] = 20;
["access_model"] = "open";
["publish_model"] = "publishers";
+ ["send_last_published_item"] = "never";
};
local default_node_config_mt = { __index = default_node_config };
@@ -176,8 +179,11 @@ local function new(config)
-- Load nodes from storage, if we have a store and it supports iterating over stored items
if config.nodestore and config.nodestore.users then
for node_name in config.nodestore:users() do
- service.nodes[node_name] = load_node_from_store(service, node_name);
- service.data[node_name] = config.itemstore(service.nodes[node_name].config, node_name);
+ local node = load_node_from_store(service, node_name);
+ service.nodes[node_name] = node;
+ if node.config.persist_items then
+ service.data[node_name] = config.itemstore(service.nodes[node_name].config, node_name);
+ end
for jid in pairs(service.nodes[node_name].subscribers) do
local normal_jid = service.config.normalize_jid(jid);
@@ -280,7 +286,8 @@ function service:set_affiliation(node, actor, jid, affiliation) --> ok, err
node_obj.affiliations[jid] = affiliation;
if self.config.nodestore then
- local ok, err = save_node_to_store(self, node_obj);
+ -- TODO pass the error from storage to caller eg wrapped in an util.error
+ local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err
if not ok then
node_obj.affiliations[jid] = old_affiliation;
return ok, "internal-server-error";
@@ -344,7 +351,8 @@ function service:add_subscription(node, actor, jid, options) --> ok, err
end
if self.config.nodestore then
- local ok, err = save_node_to_store(self, node_obj);
+ -- TODO pass the error from storage to caller eg wrapped in an util.error
+ local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err
if not ok then
node_obj.subscribers[jid] = old_subscription;
self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil;
@@ -396,7 +404,8 @@ function service:remove_subscription(node, actor, jid) --> ok, err
end
if self.config.nodestore then
- local ok, err = save_node_to_store(self, node_obj);
+ -- TODO pass the error from storage to caller eg wrapped in an util.error
+ local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err
if not ok then
node_obj.subscribers[jid] = old_subscription;
self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil;
@@ -454,14 +463,18 @@ function service:create(node, actor, options) --> ok, err
};
if self.config.nodestore then
- local ok, err = save_node_to_store(self, self.nodes[node]);
+ -- TODO pass the error from storage to caller eg wrapped in an util.error
+ local ok, err = save_node_to_store(self, self.nodes[node]); -- luacheck: ignore 211/err
if not ok then
self.nodes[node] = nil;
return ok, "internal-server-error";
end
end
- self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ if config.persist_items then
+ self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ end
+
self.events.fire_event("node-created", { service = self, node = node, actor = actor });
if actor ~= true then
local ok, err = self:set_affiliation(node, true, actor, "owner");
@@ -511,7 +524,7 @@ local function check_preconditions(node_config, required_config)
end
for config_field, value in pairs(required_config) do
if node_config[config_field] ~= value then
- return false;
+ return false, config_field;
end
end
return true;
@@ -547,23 +560,28 @@ function service:publish(node, actor, id, item, requested_config) --> ok, err
node_obj = self.nodes[node];
elseif requested_config and not requested_config._defaults_only then
-- Check that node has the requested config before we publish
- if not check_preconditions(node_obj.config, requested_config) then
- return false, "precondition-not-met";
+ local ok, field = check_preconditions(node_obj.config, requested_config);
+ if not ok then
+ local err = errors.new({
+ type = "cancel", condition = "conflict", text = "Field does not match: "..field;
+ });
+ err.pubsub_condition = "precondition-not-met";
+ return false, err;
end
end
if not self.config.itemcheck(item) then
return nil, "invalid-item";
end
- local node_data = self.data[node];
- if not node_data then
- -- FIXME how is this possible? #1657
- return nil, "internal-server-error";
- end
- local ok = node_data:set(id, item);
- if not ok then
- return nil, "internal-server-error";
+ if node_obj.config.persist_items then
+ if not self.data[node] then
+ self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ end
+ local ok = self.data[node]:set(id, item);
+ if not ok then
+ return nil, "internal-server-error";
+ end
+ if type(ok) == "string" then id = ok; end
end
- if type(ok) == "string" then id = ok; end
local event_data = { service = self, node = node, actor = actor, id = id, item = item };
self.events.fire_event("item-published/"..node, event_data);
self.events.fire_event("item-published", event_data);
@@ -583,12 +601,17 @@ function service:retract(node, actor, id, retract) --> ok, err
end
--
local node_obj = self.nodes[node];
- if (not node_obj) or (not self.data[node]:get(id)) then
+ if not node_obj then
return false, "item-not-found";
end
- local ok = self.data[node]:set(id, nil);
- if not ok then
- return nil, "internal-server-error";
+ if self.data[node] then
+ if not self.data[node]:get(id) then
+ return false, "item-not-found";
+ end
+ local ok = self.data[node]:set(id, nil);
+ if not ok then
+ return nil, "internal-server-error";
+ end
end
self.events.fire_event("item-retracted", { service = self, node = node, actor = actor, id = id });
if retract then
@@ -607,10 +630,12 @@ function service:purge(node, actor, notify) --> ok, err
if not node_obj then
return false, "item-not-found";
end
- if self.data[node] and self.data[node].clear then
- self.data[node]:clear()
- else
- self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ if self.data[node] then
+ if self.data[node].clear then
+ self.data[node]:clear()
+ else
+ self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ end
end
self.events.fire_event("node-purged", { service = self, node = node, actor = actor });
if notify then
@@ -619,7 +644,7 @@ function service:purge(node, actor, notify) --> ok, err
return true
end
-function service:get_items(node, actor, ids) --> (true, { id, [id] = node }) or (false, err)
+function service:get_items(node, actor, ids, resultspec) --> (true, { id, [id] = node }) or (false, err)
-- Access checking
if not self:may(node, actor, "get_items") then
return false, "forbidden";
@@ -629,22 +654,31 @@ function service:get_items(node, actor, ids) --> (true, { id, [id] = node }) or
if not node_obj then
return false, "item-not-found";
end
+ if not self.data[node] then
+ -- Disabled rather than unsupported, but close enough.
+ return false, "persistent-items-unsupported";
+ end
if type(ids) == "string" then -- COMPAT see #1305
ids = { ids };
end
local data = {};
+ local limit = resultspec and resultspec.max;
if ids then
for _, key in ipairs(ids) do
local value = self.data[node]:get(key);
if value then
data[#data+1] = key;
data[key] = value;
+ -- Limits and ids seem like a problematic combination.
+ if limit and #data >= limit then break end
end
end
else
for key, value in self.data[node]:items() do
data[#data+1] = key;
data[key] = value;
+ if limit and #data >= limit then break
+ end
end
end
return true, data;
@@ -662,6 +696,11 @@ function service:get_last_item(node, actor) --> (true, id, node) or (false, err)
return false, "item-not-found";
end
+ if not self.data[node] then
+ -- FIXME Should this be a success or failure?
+ return true, nil;
+ end
+
-- Returns success, id, item
return true, self.data[node]:head();
end
@@ -772,7 +811,8 @@ function service:set_node_config(node, actor, new_config) --> ok, err
node_obj.config = new_config;
if self.config.nodestore then
- local ok, err = save_node_to_store(self, node_obj);
+ -- TODO pass the error from storage to caller eg wrapped in an util.error
+ local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err
if not ok then
node_obj.config = old_config;
return ok, "internal-server-error";
@@ -792,9 +832,22 @@ function service:set_node_config(node, actor, new_config) --> ok, err
end
if old_config["persist_items"] ~= node_obj.config["persist_items"] then
- self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ if node_obj.config["persist_items"] then
+ self.data[node] = self.config.itemstore(self.nodes[node].config, node);
+ elseif self.data[node] then
+ if self.data[node].clear then
+ self.data[node]:clear()
+ end
+ self.data[node] = nil;
+ end
elseif old_config["max_items"] ~= node_obj.config["max_items"] then
- self.data[node]:resize(self.nodes[node].config["max_items"]);
+ if self.data[node] then
+ local max_items = self.nodes[node].config["max_items"];
+ if max_items == "max" then
+ max_items = self.config.max_items;
+ end
+ self.data[node]:resize(max_items);
+ end
end
return true;