diff options
Diffstat (limited to 'util/pubsub.lua')
-rw-r--r-- | util/pubsub.lua | 117 |
1 files changed, 85 insertions, 32 deletions
diff --git a/util/pubsub.lua b/util/pubsub.lua index 7ccc817f..acb34db9 100644 --- a/util/pubsub.lua +++ b/util/pubsub.lua @@ -1,9 +1,11 @@ local events = require "util.events"; local cache = require "util.cache"; +local errors = require "util.error"; local service_mt = {}; local default_config = { + max_items = 256; itemstore = function (config, _) return cache.new(config["max_items"]) end; broadcaster = function () end; subscriber_filter = function (subs) return subs end; @@ -131,10 +133,11 @@ local default_config = { local default_config_mt = { __index = default_config }; local default_node_config = { - ["persist_items"] = false; + ["persist_items"] = true; ["max_items"] = 20; ["access_model"] = "open"; ["publish_model"] = "publishers"; + ["send_last_published_item"] = "never"; }; local default_node_config_mt = { __index = default_node_config }; @@ -176,8 +179,11 @@ local function new(config) -- Load nodes from storage, if we have a store and it supports iterating over stored items if config.nodestore and config.nodestore.users then for node_name in config.nodestore:users() do - service.nodes[node_name] = load_node_from_store(service, node_name); - service.data[node_name] = config.itemstore(service.nodes[node_name].config, node_name); + local node = load_node_from_store(service, node_name); + service.nodes[node_name] = node; + if node.config.persist_items then + service.data[node_name] = config.itemstore(service.nodes[node_name].config, node_name); + end for jid in pairs(service.nodes[node_name].subscribers) do local normal_jid = service.config.normalize_jid(jid); @@ -280,7 +286,8 @@ function service:set_affiliation(node, actor, jid, affiliation) --> ok, err node_obj.affiliations[jid] = affiliation; if self.config.nodestore then - local ok, err = save_node_to_store(self, node_obj); + -- TODO pass the error from storage to caller eg wrapped in an util.error + local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err if not ok then node_obj.affiliations[jid] = old_affiliation; return ok, "internal-server-error"; @@ -344,7 +351,8 @@ function service:add_subscription(node, actor, jid, options) --> ok, err end if self.config.nodestore then - local ok, err = save_node_to_store(self, node_obj); + -- TODO pass the error from storage to caller eg wrapped in an util.error + local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err if not ok then node_obj.subscribers[jid] = old_subscription; self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil; @@ -396,7 +404,8 @@ function service:remove_subscription(node, actor, jid) --> ok, err end if self.config.nodestore then - local ok, err = save_node_to_store(self, node_obj); + -- TODO pass the error from storage to caller eg wrapped in an util.error + local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err if not ok then node_obj.subscribers[jid] = old_subscription; self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil; @@ -454,14 +463,18 @@ function service:create(node, actor, options) --> ok, err }; if self.config.nodestore then - local ok, err = save_node_to_store(self, self.nodes[node]); + -- TODO pass the error from storage to caller eg wrapped in an util.error + local ok, err = save_node_to_store(self, self.nodes[node]); -- luacheck: ignore 211/err if not ok then self.nodes[node] = nil; return ok, "internal-server-error"; end end - self.data[node] = self.config.itemstore(self.nodes[node].config, node); + if config.persist_items then + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + end + self.events.fire_event("node-created", { service = self, node = node, actor = actor }); if actor ~= true then local ok, err = self:set_affiliation(node, true, actor, "owner"); @@ -511,7 +524,7 @@ local function check_preconditions(node_config, required_config) end for config_field, value in pairs(required_config) do if node_config[config_field] ~= value then - return false; + return false, config_field; end end return true; @@ -547,23 +560,28 @@ function service:publish(node, actor, id, item, requested_config) --> ok, err node_obj = self.nodes[node]; elseif requested_config and not requested_config._defaults_only then -- Check that node has the requested config before we publish - if not check_preconditions(node_obj.config, requested_config) then - return false, "precondition-not-met"; + local ok, field = check_preconditions(node_obj.config, requested_config); + if not ok then + local err = errors.new({ + type = "cancel", condition = "conflict", text = "Field does not match: "..field; + }); + err.pubsub_condition = "precondition-not-met"; + return false, err; end end if not self.config.itemcheck(item) then return nil, "invalid-item"; end - local node_data = self.data[node]; - if not node_data then - -- FIXME how is this possible? #1657 - return nil, "internal-server-error"; - end - local ok = node_data:set(id, item); - if not ok then - return nil, "internal-server-error"; + if node_obj.config.persist_items then + if not self.data[node] then + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + end + local ok = self.data[node]:set(id, item); + if not ok then + return nil, "internal-server-error"; + end + if type(ok) == "string" then id = ok; end end - if type(ok) == "string" then id = ok; end local event_data = { service = self, node = node, actor = actor, id = id, item = item }; self.events.fire_event("item-published/"..node, event_data); self.events.fire_event("item-published", event_data); @@ -583,12 +601,17 @@ function service:retract(node, actor, id, retract) --> ok, err end -- local node_obj = self.nodes[node]; - if (not node_obj) or (not self.data[node]:get(id)) then + if not node_obj then return false, "item-not-found"; end - local ok = self.data[node]:set(id, nil); - if not ok then - return nil, "internal-server-error"; + if self.data[node] then + if not self.data[node]:get(id) then + return false, "item-not-found"; + end + local ok = self.data[node]:set(id, nil); + if not ok then + return nil, "internal-server-error"; + end end self.events.fire_event("item-retracted", { service = self, node = node, actor = actor, id = id }); if retract then @@ -607,10 +630,12 @@ function service:purge(node, actor, notify) --> ok, err if not node_obj then return false, "item-not-found"; end - if self.data[node] and self.data[node].clear then - self.data[node]:clear() - else - self.data[node] = self.config.itemstore(self.nodes[node].config, node); + if self.data[node] then + if self.data[node].clear then + self.data[node]:clear() + else + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + end end self.events.fire_event("node-purged", { service = self, node = node, actor = actor }); if notify then @@ -619,7 +644,7 @@ function service:purge(node, actor, notify) --> ok, err return true end -function service:get_items(node, actor, ids) --> (true, { id, [id] = node }) or (false, err) +function service:get_items(node, actor, ids, resultspec) --> (true, { id, [id] = node }) or (false, err) -- Access checking if not self:may(node, actor, "get_items") then return false, "forbidden"; @@ -629,22 +654,31 @@ function service:get_items(node, actor, ids) --> (true, { id, [id] = node }) or if not node_obj then return false, "item-not-found"; end + if not self.data[node] then + -- Disabled rather than unsupported, but close enough. + return false, "persistent-items-unsupported"; + end if type(ids) == "string" then -- COMPAT see #1305 ids = { ids }; end local data = {}; + local limit = resultspec and resultspec.max; if ids then for _, key in ipairs(ids) do local value = self.data[node]:get(key); if value then data[#data+1] = key; data[key] = value; + -- Limits and ids seem like a problematic combination. + if limit and #data >= limit then break end end end else for key, value in self.data[node]:items() do data[#data+1] = key; data[key] = value; + if limit and #data >= limit then break + end end end return true, data; @@ -662,6 +696,11 @@ function service:get_last_item(node, actor) --> (true, id, node) or (false, err) return false, "item-not-found"; end + if not self.data[node] then + -- FIXME Should this be a success or failure? + return true, nil; + end + -- Returns success, id, item return true, self.data[node]:head(); end @@ -772,7 +811,8 @@ function service:set_node_config(node, actor, new_config) --> ok, err node_obj.config = new_config; if self.config.nodestore then - local ok, err = save_node_to_store(self, node_obj); + -- TODO pass the error from storage to caller eg wrapped in an util.error + local ok, err = save_node_to_store(self, node_obj); -- luacheck: ignore 211/err if not ok then node_obj.config = old_config; return ok, "internal-server-error"; @@ -792,9 +832,22 @@ function service:set_node_config(node, actor, new_config) --> ok, err end if old_config["persist_items"] ~= node_obj.config["persist_items"] then - self.data[node] = self.config.itemstore(self.nodes[node].config, node); + if node_obj.config["persist_items"] then + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + elseif self.data[node] then + if self.data[node].clear then + self.data[node]:clear() + end + self.data[node] = nil; + end elseif old_config["max_items"] ~= node_obj.config["max_items"] then - self.data[node]:resize(self.nodes[node].config["max_items"]); + if self.data[node] then + local max_items = self.nodes[node].config["max_items"]; + if max_items == "max" then + max_items = self.config.max_items; + end + self.data[node]:resize(max_items); + end end return true; |