diff options
Diffstat (limited to 'util/pubsub.lua')
-rw-r--r-- | util/pubsub.lua | 534 |
1 files changed, 444 insertions, 90 deletions
diff --git a/util/pubsub.lua b/util/pubsub.lua index 1db917d8..fafae50a 100644 --- a/util/pubsub.lua +++ b/util/pubsub.lua @@ -1,50 +1,212 @@ local events = require "util.events"; local cache = require "util.cache"; -local service = {}; -local service_mt = { __index = service }; +local service_mt = {}; -local default_config = { __index = { - itemstore = function (config) return cache.new(tonumber(config["pubsub#max_items"])) end; +local default_config = { + itemstore = function (config, _) return cache.new(config["max_items"]) end; broadcaster = function () end; + itemcheck = function () return true; end; get_affiliation = function () end; - capabilities = {}; -} }; -local default_node_config = { __index = { - ["pubsub#max_items"] = "20"; -} }; + normalize_jid = function (jid) return jid; end; + capabilities = { + outcast = { + create = false; + publish = false; + retract = false; + get_nodes = false; + + subscribe = false; + unsubscribe = false; + get_subscription = true; + get_subscriptions = true; + get_items = false; + + subscribe_other = false; + unsubscribe_other = false; + get_subscription_other = false; + get_subscriptions_other = false; + + be_subscribed = false; + be_unsubscribed = true; + + set_affiliation = false; + }; + none = { + create = false; + publish = false; + retract = false; + get_nodes = true; + + subscribe = true; + unsubscribe = true; + get_subscription = true; + get_subscriptions = true; + get_items = false; + + subscribe_other = false; + unsubscribe_other = false; + get_subscription_other = false; + get_subscriptions_other = false; + + be_subscribed = true; + be_unsubscribed = true; + + set_affiliation = false; + }; + member = { + create = false; + publish = false; + retract = false; + get_nodes = true; + + subscribe = true; + unsubscribe = true; + get_subscription = true; + get_subscriptions = true; + get_items = true; + + subscribe_other = false; + unsubscribe_other = false; + get_subscription_other = false; + get_subscriptions_other = false; + + be_subscribed = true; + be_unsubscribed = true; + + set_affiliation = false; + }; + publisher = { + create = false; + publish = true; + retract = true; + get_nodes = true; + get_configuration = true; + + subscribe = true; + unsubscribe = true; + get_subscription = true; + get_subscriptions = true; + get_items = true; + + subscribe_other = false; + unsubscribe_other = false; + get_subscription_other = false; + get_subscriptions_other = false; + + be_subscribed = true; + be_unsubscribed = true; + + set_affiliation = false; + }; + owner = { + create = true; + publish = true; + retract = true; + delete = true; + get_nodes = true; + configure = true; + get_configuration = true; + + subscribe = true; + unsubscribe = true; + get_subscription = true; + get_subscriptions = true; + get_items = true; + + subscribe_other = true; + unsubscribe_other = true; + get_subscription_other = true; + get_subscriptions_other = true; + + be_subscribed = true; + be_unsubscribed = true; + + set_affiliation = true; + }; + }; +}; +local default_config_mt = { __index = default_config }; + +local default_node_config = { + ["persist_items"] = false; + ["max_items"] = 20; + ["access_model"] = "open"; + ["publish_model"] = "publishers"; +}; +local default_node_config_mt = { __index = default_node_config }; + +-- Storage helper functions + +local function load_node_from_store(service, node_name) + local node = service.config.nodestore:get(node_name); + node.config = setmetatable(node.config or {}, {__index=service.node_defaults}); + return node; +end + +local function save_node_to_store(service, node) + return service.config.nodestore:set(node.name, { + name = node.name; + config = node.config; + subscribers = node.subscribers; + affiliations = node.affiliations; + }); +end + +local function delete_node_in_store(service, node_name) + return service.config.nodestore:set(node_name, nil); +end + +-- Create and return a new service object local function new(config) config = config or {}; - return setmetatable({ - config = setmetatable(config, default_config); - node_defaults = setmetatable(config.node_defaults or {}, default_node_config); + + local service = setmetatable({ + config = setmetatable(config, default_config_mt); + node_defaults = setmetatable(config.node_defaults or {}, default_node_config_mt); affiliations = {}; subscriptions = {}; nodes = {}; data = {}; events = events.new(); }, service_mt); + + -- Load nodes from storage, if we have a store and it supports iterating over stored items + if config.nodestore and config.nodestore.users then + for node_name in config.nodestore:users() do + service.nodes[node_name] = load_node_from_store(service, node_name); + service.data[node_name] = config.itemstore(service.nodes[node_name].config, node_name); + end + end + + return service; end -function service:jids_equal(jid1, jid2) +--- Service methods + +local service = {}; +service_mt.__index = service; + +function service:jids_equal(jid1, jid2) --> boolean local normalize = self.config.normalize_jid; return normalize(jid1) == normalize(jid2); end -function service:may(node, actor, action) +function service:may(node, actor, action) --> boolean if actor == true then return true; end local node_obj = self.nodes[node]; - local node_aff = node_obj and node_obj.affiliations[actor]; + local node_aff = node_obj and (node_obj.affiliations[actor] + or node_obj.affiliations[self.config.normalize_jid(actor)]); local service_aff = self.affiliations[actor] - or self.config.get_affiliation(actor, node, action) - or "none"; + or self.config.get_affiliation(actor, node, action); + local default_aff = self:get_default_affiliation(node, actor) or "none"; -- Check if node allows/forbids it local node_capabilities = node_obj and node_obj.capabilities; if node_capabilities then - local caps = node_capabilities[node_aff or service_aff]; + local caps = node_capabilities[node_aff or service_aff or default_aff]; if caps then local can = caps[action]; if can ~= nil then @@ -55,7 +217,7 @@ function service:may(node, actor, action) -- Check service-wide capabilities instead local service_capabilities = self.config.capabilities; - local caps = service_capabilities[node_aff or service_aff]; + local caps = service_capabilities[node_aff or service_aff or default_aff]; if caps then local can = caps[action]; if can ~= nil then @@ -66,7 +228,29 @@ function service:may(node, actor, action) return false; end -function service:set_affiliation(node, actor, jid, affiliation) +function service:get_default_affiliation(node, actor) --> affiliation + local node_obj = self.nodes[node]; + local access_model = node_obj and node_obj.config.access_model + or self.node_defaults.access_model; + + if access_model == "open" then + return "member"; + elseif access_model == "whitelist" then + return "outcast"; + end + + if self.config.access_models then + local check = self.config.access_models[access_model]; + if check then + local aff = check(actor); + if aff then + return aff; + end + end + end +end + +function service:set_affiliation(node, actor, jid, affiliation) --> ok, err -- Access checking if not self:may(node, actor, "set_affiliation") then return false, "forbidden"; @@ -76,7 +260,18 @@ function service:set_affiliation(node, actor, jid, affiliation) if not node_obj then return false, "item-not-found"; end + jid = self.config.normalize_jid(jid); + local old_affiliation = node_obj.affiliations[jid]; node_obj.affiliations[jid] = affiliation; + + if self.config.nodestore then + local ok, err = save_node_to_store(self, node_obj); + if not ok then + node_obj.affiliations[jid] = old_affiliation; + return ok, "internal-server-error"; + end + end + local _, jid_sub = self:get_subscription(node, true, jid); if not jid_sub and not self:may(node, jid, "be_unsubscribed") then local ok, err = self:add_subscription(node, true, jid); @@ -92,7 +287,7 @@ function service:set_affiliation(node, actor, jid, affiliation) return true; end -function service:add_subscription(node, actor, jid, options) +function service:add_subscription(node, actor, jid, options) --> ok, err -- Access checking local cap; if actor == true or jid == actor or self:jids_equal(actor, jid) then @@ -119,6 +314,7 @@ function service:add_subscription(node, actor, jid, options) node_obj = self.nodes[node]; end end + local old_subscription = node_obj.subscribers[jid]; node_obj.subscribers[jid] = options or true; local normal_jid = self.config.normalize_jid(jid); local subs = self.subscriptions[normal_jid]; @@ -131,11 +327,21 @@ function service:add_subscription(node, actor, jid, options) else self.subscriptions[normal_jid] = { [jid] = { [node] = true } }; end - self.events.fire_event("subscription-added", { node = node, jid = jid, normalized_jid = normal_jid, options = options }); + + if self.config.nodestore then + local ok, err = save_node_to_store(self, node_obj); + if not ok then + node_obj.subscribers[jid] = old_subscription; + self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil; + return ok, "internal-server-error"; + end + end + + self.events.fire_event("subscription-added", { service = self, node = node, jid = jid, normalized_jid = normal_jid, options = options }); return true; end -function service:remove_subscription(node, actor, jid) +function service:remove_subscription(node, actor, jid) --> ok, err -- Access checking local cap; if actor == true or jid == actor or self:jids_equal(actor, jid) then @@ -157,6 +363,7 @@ function service:remove_subscription(node, actor, jid) if not node_obj.subscribers[jid] then return false, "not-subscribed"; end + local old_subscription = node_obj.subscribers[jid]; node_obj.subscribers[jid] = nil; local normal_jid = self.config.normalize_jid(jid); local subs = self.subscriptions[normal_jid]; @@ -172,23 +379,21 @@ function service:remove_subscription(node, actor, jid) self.subscriptions[normal_jid] = nil; end end - self.events.fire_event("subscription-removed", { node = node, jid = jid, normalized_jid = normal_jid }); - return true; -end -function service:remove_all_subscriptions(actor, jid) - local normal_jid = self.config.normalize_jid(jid); - local subs = self.subscriptions[normal_jid] - subs = subs and subs[jid]; - if subs then - for node in pairs(subs) do - self:remove_subscription(node, true, jid); + if self.config.nodestore then + local ok, err = save_node_to_store(self, node_obj); + if not ok then + node_obj.subscribers[jid] = old_subscription; + self.subscriptions[normal_jid][jid][node] = old_subscription and true or nil; + return ok, "internal-server-error"; end end + + self.events.fire_event("subscription-removed", { service = self, node = node, jid = jid, normalized_jid = normal_jid }); return true; end -function service:get_subscription(node, actor, jid) +function service:get_subscription(node, actor, jid) --> (true, subscription) or (false, err) -- Access checking local cap; if actor == true or jid == actor or self:jids_equal(actor, jid) then @@ -207,7 +412,7 @@ function service:get_subscription(node, actor, jid) return true, node_obj.subscribers[jid]; end -function service:create(node, actor, options) +function service:create(node, actor, options) --> ok, err -- Access checking if not self:may(node, actor, "create") then return false, "forbidden"; @@ -223,17 +428,30 @@ function service:create(node, actor, options) config = setmetatable(options or {}, {__index=self.node_defaults}); affiliations = {}; }; - self.data[node] = self.config.itemstore(self.nodes[node].config); - self.events.fire_event("node-created", { node = node, actor = actor }); - local ok, err = self:set_affiliation(node, true, actor, "owner"); - if not ok then - self.nodes[node] = nil; - self.data[node] = nil; + + if self.config.nodestore then + local ok, err = save_node_to_store(self, self.nodes[node]); + if not ok then + self.nodes[node] = nil; + return ok, "internal-server-error"; + end end - return ok, err; + + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + self.events.fire_event("node-created", { service = self, node = node, actor = actor }); + if actor ~= true then + local ok, err = self:set_affiliation(node, true, actor, "owner"); + if not ok then + self.nodes[node] = nil; + self.data[node] = nil; + return ok, err; + end + end + + return true; end -function service:delete(node, actor) +function service:delete(node, actor) --> ok, err -- Access checking if not self:may(node, actor, "delete") then return false, "forbidden"; @@ -244,15 +462,52 @@ function service:delete(node, actor) return false, "item-not-found"; end self.nodes[node] = nil; + if self.data[node] and self.data[node].clear then + self.data[node]:clear(); + end self.data[node] = nil; - self.events.fire_event("node-deleted", { node = node, actor = actor }); - self.config.broadcaster("delete", node, node_obj.subscribers); + + if self.config.nodestore then + local ok, err = delete_node_in_store(self, node); + if not ok then + self.nodes[node] = nil; + return ok, err; + end + end + + self.events.fire_event("node-deleted", { service = self, node = node, actor = actor }); + self.config.broadcaster("delete", node, node_obj.subscribers, nil, actor, node_obj, self); return true; end -function service:publish(node, actor, id, item) +-- Used to check that the config of a node is as expected (i.e. 'publish-options') +local function check_preconditions(node_config, required_config) + if not (node_config and required_config) then + return false; + end + for config_field, value in pairs(required_config) do + if node_config[config_field] ~= value then + return false; + end + end + return true; +end + +function service:publish(node, actor, id, item, requested_config) --> ok, err -- Access checking - if not self:may(node, actor, "publish") then + local may_publish = false; + + if self:may(node, actor, "publish") then + may_publish = true; + else + local node_obj = self.nodes[node]; + local publish_model = node_obj and node_obj.config.publish_model; + if publish_model == "open" + or (publish_model == "subscribers" and node_obj.subscribers[actor]) then + may_publish = true; + end + end + if not may_publish then return false, "forbidden"; end -- @@ -261,23 +516,34 @@ function service:publish(node, actor, id, item) if not self.config.autocreate_on_publish then return false, "item-not-found"; end - local ok, err = self:create(node, true); + local ok, err = self:create(node, true, requested_config); if not ok then return ok, err; end node_obj = self.nodes[node]; + elseif requested_config and not requested_config._defaults_only then + -- Check that node has the requested config before we publish + if not check_preconditions(node_obj.config, requested_config) then + return false, "precondition-not-met"; + end + end + if not self.config.itemcheck(item) then + return nil, "invalid-item"; end local node_data = self.data[node]; local ok = node_data:set(id, item); if not ok then return nil, "internal-server-error"; end - self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item }); - self.config.broadcaster("items", node, node_obj.subscribers, item, actor); + if type(ok) == "string" then id = ok; end + local event_data = { service = self, node = node, actor = actor, id = id, item = item }; + self.events.fire_event("item-published/"..node, event_data); + self.events.fire_event("item-published", event_data); + self.config.broadcaster("items", node, node_obj.subscribers, item, actor, node_obj, self); return true; end -function service:retract(node, actor, id, retract) +function service:retract(node, actor, id, retract) --> ok, err -- Access checking if not self:may(node, actor, "retract") then return false, "forbidden"; @@ -291,14 +557,14 @@ function service:retract(node, actor, id, retract) if not ok then return nil, "internal-server-error"; end - self.events.fire_event("item-retracted", { node = node, actor = actor, id = id }); + self.events.fire_event("item-retracted", { service = self, node = node, actor = actor, id = id }); if retract then - self.config.broadcaster("items", node, node_obj.subscribers, retract); + self.config.broadcaster("retract", node, node_obj.subscribers, retract, actor, node_obj, self); end return true end -function service:purge(node, actor, notify) +function service:purge(node, actor, notify) --> ok, err -- Access checking if not self:may(node, actor, "retract") then return false, "forbidden"; @@ -308,15 +574,19 @@ function service:purge(node, actor, notify) if not node_obj then return false, "item-not-found"; end - self.data[node] = self.config.itemstore(self.nodes[node].config); - self.events.fire_event("node-purged", { node = node, actor = actor }); + if self.data[node] and self.data[node].clear then + self.data[node]:clear() + else + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + end + self.events.fire_event("node-purged", { service = self, node = node, actor = actor }); if notify then - self.config.broadcaster("purge", node, node_obj.subscribers); + self.config.broadcaster("purge", node, node_obj.subscribers, nil, actor, node_obj, self); end return true end -function service:get_items(node, actor, id) +function service:get_items(node, actor, id) --> (true, { id, [id] = node }) or (false, err) -- Access checking if not self:may(node, actor, "get_items") then return false, "forbidden"; @@ -327,7 +597,11 @@ function service:get_items(node, actor, id) return false, "item-not-found"; end if id then -- Restrict results to a single specific item - return true, { id, [id] = self.data[node]:get(id) }; + local with_id = self.data[node]:get(id); + if not with_id then + return true, { }; + end + return true, { id, [id] = with_id }; else local data = {} for key, value in self.data[node]:items() do @@ -338,7 +612,23 @@ function service:get_items(node, actor, id) end end -function service:get_nodes(actor) +function service:get_last_item(node, actor) --> (true, id, node) or (false, err) + -- Access checking + if not self:may(node, actor, "get_items") then + return false, "forbidden"; + end + -- + + -- Check node exists + if not self.nodes[node] then + return false, "item-not-found"; + end + + -- Returns success, id, item + return true, self.data[node]:head(); +end + +function service:get_nodes(actor) --> (true, map) or (false, err) -- Access checking if not self:may(nil, actor, "get_nodes") then return false, "forbidden"; @@ -347,7 +637,30 @@ function service:get_nodes(actor) return true, self.nodes; end -function service:get_subscriptions(node, actor, jid) +local function flatten_subscriptions(ret, serv, subs, node, node_obj) + for subscribed_jid, subscribed_nodes in pairs(subs) do + if node then -- Return only subscriptions to this node + if subscribed_nodes[node] then + ret[#ret+1] = { + node = node; + jid = subscribed_jid; + subscription = node_obj.subscribers[subscribed_jid]; + }; + end + else -- Return subscriptions to all nodes + local nodes = serv.nodes; + for subscribed_node in pairs(subscribed_nodes) do + ret[#ret+1] = { + node = subscribed_node; + jid = subscribed_jid; + subscription = nodes[subscribed_node].subscribers[subscribed_jid]; + }; + end + end + end +end + +function service:get_subscriptions(node, actor, jid) --> (true, array) or (false, err) -- Access checking local cap; if actor == true or jid == actor or self:jids_equal(actor, jid) then @@ -366,38 +679,25 @@ function service:get_subscriptions(node, actor, jid) return false, "item-not-found"; end end + local ret = {}; + if jid == nil then + for _, subs in pairs(self.subscriptions) do + flatten_subscriptions(ret, self, subs, node, node_obj) + end + return true, ret; + end local normal_jid = self.config.normalize_jid(jid); local subs = self.subscriptions[normal_jid]; -- We return the subscription object from the node to save -- a get_subscription() call for each node. - local ret = {}; if subs then - for subscribed_jid, subscribed_nodes in pairs(subs) do - if node then -- Return only subscriptions to this node - if subscribed_nodes[node] then - ret[#ret+1] = { - node = node; - jid = subscribed_jid; - subscription = node_obj.subscribers[subscribed_jid]; - }; - end - else -- Return subscriptions to all nodes - local nodes = self.nodes; - for subscribed_node in pairs(subscribed_nodes) do - ret[#ret+1] = { - node = subscribed_node; - jid = subscribed_jid; - subscription = nodes[subscribed_node].subscribers[subscribed_jid]; - }; - end - end - end + flatten_subscriptions(ret, self, subs, node, node_obj) end return true, ret; end -- Access models only affect 'none' affiliation caps, service/default access level... -function service:set_node_capabilities(node, actor, capabilities) +function service:set_node_capabilities(node, actor, capabilities) --> ok, err -- Access checking if not self:may(node, actor, "configure") then return false, "forbidden"; @@ -411,7 +711,7 @@ function service:set_node_capabilities(node, actor, capabilities) return true; end -function service:set_node_config(node, actor, new_config) +function service:set_node_config(node, actor, new_config) --> ok, err if not self:may(node, actor, "configure") then return false, "forbidden"; end @@ -421,17 +721,71 @@ function service:set_node_config(node, actor, new_config) return false, "item-not-found"; end - for k,v in pairs(new_config) do - node_obj.config[k] = v; + setmetatable(new_config, {__index=self.node_defaults}) + + if self.config.check_node_config then + local ok = self.config.check_node_config(node, actor, new_config); + if not ok then + return false, "not-acceptable"; + end + end + + local old_config = node_obj.config; + node_obj.config = new_config; + + if self.config.nodestore then + local ok, err = save_node_to_store(self, node_obj); + if not ok then + node_obj.config = old_config; + return ok, "internal-server-error"; + end + end + + if old_config["access_model"] ~= node_obj.config["access_model"] then + for subscriber in pairs(node_obj.subscribers) do + if not self:may(node, subscriber, "be_subscribed") then + local ok, err = self:remove_subscription(node, true, subscriber); + if not ok then + node_obj.config = old_config; + return ok, err; + end + end + end end - local new_data = self.config.itemstore(self.nodes[node].config); - for key, value in self.data[node]:items() do - new_data:set(key, value); + + if old_config["persist_items"] ~= node_obj.config["persist_items"] then + self.data[node] = self.config.itemstore(self.nodes[node].config, node); + elseif old_config["max_items"] ~= node_obj.config["max_items"] then + self.data[node]:resize(self.nodes[node].config["max_items"]); end - self.data[node] = new_data; + return true; end +function service:get_node_config(node, actor) --> (true, config) or (false, err) + if not self:may(node, actor, "get_configuration") then + return false, "forbidden"; + end + + local node_obj = self.nodes[node]; + if not node_obj then + return false, "item-not-found"; + end + + local config_table = {}; + for k, v in pairs(default_node_config) do + config_table[k] = v; + end + for k, v in pairs(self.node_defaults) do + config_table[k] = v; + end + for k, v in pairs(node_obj.config) do + config_table[k] = v; + end + + return true, config_table; +end + return { new = new; }; |