aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_storage_sql.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_storage_sql.lua')
-rw-r--r--plugins/mod_storage_sql.lua290
1 files changed, 253 insertions, 37 deletions
diff --git a/plugins/mod_storage_sql.lua b/plugins/mod_storage_sql.lua
index 5b1c3603..6bd094e5 100644
--- a/plugins/mod_storage_sql.lua
+++ b/plugins/mod_storage_sql.lua
@@ -1,17 +1,19 @@
-- luacheck: ignore 212/self
+local cache = require "util.cache";
local json = require "util.json";
local sql = require "util.sql";
local xml_parse = require "util.xml".parse;
local uuid = require "util.uuid";
local resolve_relative_path = require "util.paths".resolve_relative_path;
+local jid_join = require "util.jid".join;
local is_stanza = require"util.stanza".is_stanza;
local t_concat = table.concat;
local noop = function() end
-local unpack = table.unpack or unpack;
+local unpack = table.unpack or unpack; -- luacheck: ignore 113
local function iterator(result)
return function(result_)
local row = result_();
@@ -148,7 +150,13 @@ end
--- Archive store API
--- luacheck: ignore 512 431/user 431/store
+local archive_item_limit = module:get_option_number("storage_archive_item_limit");
+local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
+
+local item_count_cache_hit = module:measure("item_count_cache_hit", "rate");
+local item_count_cache_miss = module:measure("item_count_cache_miss", "rate")
+
+-- luacheck: ignore 512 431/user 431/store 431/err
local map_store = {};
map_store.__index = map_store;
map_store.remove = {};
@@ -225,13 +233,93 @@ function map_store:set_keys(username, keydatas)
return result;
end
+function map_store:get_all(key)
+ if type(key) ~= "string" or key == "" then
+ return nil, "get_all only supports non-empty string keys";
+ end
+ local ok, result = engine:transaction(function()
+ local query = [[
+ SELECT "user", "type", "value"
+ FROM "prosody"
+ WHERE "host"=? AND "store"=? AND "key"=?
+ ]];
+
+ local data;
+ for row in engine:select(query, host, self.store, key) do
+ local key_data, err = deserialize(row[2], row[3]);
+ assert(key_data ~= nil, err);
+ if data == nil then
+ data = {};
+ end
+ data[row[1]] = key_data;
+ end
+
+ return data;
+
+ end);
+ if not ok then return nil, result; end
+ return result;
+end
+
+function map_store:delete_all(key)
+ if type(key) ~= "string" or key == "" then
+ return nil, "delete_all only supports non-empty string keys";
+ end
+ local ok, result = engine:transaction(function()
+ local delete_sql = [[
+ DELETE FROM "prosody"
+ WHERE "host"=? AND "store"=? AND "key"=?;
+ ]];
+ engine:delete(delete_sql, host, self.store, key);
+ return true;
+ end);
+ if not ok then return nil, result; end
+ return result;
+end
+
local archive_store = {}
archive_store.caps = {
total = true;
+ quota = archive_item_limit;
+ truncate = true;
+ full_id_range = true;
+ ids = true;
};
archive_store.__index = archive_store
function archive_store:append(username, key, value, when, with)
local user,store = username,self.store;
+ local cache_key = jid_join(username, host, store);
+ local item_count = archive_item_count_cache:get(cache_key);
+ if not item_count then
+ item_count_cache_miss();
+ local ok, ret = engine:transaction(function()
+ local count_sql = [[
+ SELECT COUNT(*) FROM "prosodyarchive"
+ WHERE "host"=? AND "user"=? AND "store"=?;
+ ]];
+ local result = engine:select(count_sql, host, user, store);
+ if result then
+ for row in result do
+ item_count = row[1];
+ end
+ end
+ end);
+ if not ok or not item_count then
+ module:log("error", "Failed while checking quota for %s: %s", username, ret);
+ return nil, "Failure while checking quota";
+ end
+ archive_item_count_cache:set(cache_key, item_count);
+ else
+ item_count_cache_hit();
+ end
+
+ if archive_item_limit then
+ module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
+ if item_count >= archive_item_limit then
+ return nil, "quota-limit";
+ end
+ end
+
when = when or os.time();
with = with or "";
local ok, ret = engine:transaction(function()
@@ -245,12 +333,16 @@ function archive_store:append(username, key, value, when, with)
VALUES (?,?,?,?,?,?,?,?);
]];
if key then
- engine:delete(delete_sql, host, user or "", store, key);
+ local result = engine:delete(delete_sql, host, user or "", store, key);
+ if result then
+ item_count = item_count - result:affected();
+ end
else
key = uuid.generate();
end
local t, encoded_value = assert(serialize(value));
engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
+ archive_item_count_cache:set(cache_key, item_count+1);
return key;
end);
if not ok then return ok, ret; end
@@ -285,47 +377,60 @@ local function archive_where(query, args, where)
where[#where+1] = "\"key\" = ?";
args[#args+1] = query.key
end
+
+ -- Set of ids
+ if query.ids then
+ local nids, nargs = #query.ids, #args;
+ -- COMPAT Lua 5.1: No separator argument to string.rep
+ where[#where + 1] = "\"key\" IN (" .. string.rep("?,", nids):sub(1,-2) .. ")";
+ for i, id in ipairs(query.ids) do
+ args[nargs+i] = id;
+ end
+ end
end
local function archive_where_id_range(query, args, where)
- local args_len = #args
-- Before or after specific item, exclusive
+ local id_lookup_sql = [[
+ SELECT "sort_id"
+ FROM "prosodyarchive"
+ WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
+ LIMIT 1;
+ ]];
if query.after then -- keys better be unique!
- where[#where+1] = [[
- "sort_id" > COALESCE(
- (
- SELECT "sort_id"
- FROM "prosodyarchive"
- WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
- LIMIT 1
- ), 0)
- ]];
- args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3];
- args_len = args_len + 4
+ local after_id = nil;
+ for row in engine:select(id_lookup_sql, query.after, args[1], args[2], args[3]) do
+ after_id = row[1];
+ end
+ if not after_id then
+ return nil, "item-not-found";
+ end
+ where[#where+1] = '"sort_id" > ?';
+ args[#args+1] = after_id;
end
if query.before then
- where[#where+1] = [[
- "sort_id" < COALESCE(
- (
- SELECT "sort_id"
- FROM "prosodyarchive"
- WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
- LIMIT 1
- ),
- (
- SELECT MAX("sort_id")+1
- FROM "prosodyarchive"
- )
- )
- ]]
- args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3];
+ local before_id = nil;
+ for row in engine:select(id_lookup_sql, query.before, args[1], args[2], args[3]) do
+ before_id = row[1];
+ end
+ if not before_id then
+ return nil, "item-not-found";
+ end
+ where[#where+1] = '"sort_id" < ?';
+ args[#args+1] = before_id;
end
+ return true;
end
function archive_store:find(username, query)
query = query or {};
local user,store = username,self.store;
- local total;
- local ok, result = engine:transaction(function()
+ local cache_key = jid_join(username, host, self.store);
+ local total = archive_item_count_cache:get(cache_key);
+ (total and item_count_cache_hit or item_count_cache_miss)();
+ if total ~= nil and query.limit == 0 and query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then
+ return noop, total;
+ end
+ local ok, result, err = engine:transaction(function()
local sql_query = [[
SELECT "key", "type", "value", "when", "with"
FROM "prosodyarchive"
@@ -346,12 +451,16 @@ function archive_store:find(username, query)
total = row[1];
end
end
+ if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then
+ archive_item_count_cache:set(cache_key, total);
+ end
if query.limit == 0 then -- Skip the real query
return noop, total;
end
end
- archive_where_id_range(query, args, where);
+ local ok, err = archive_where_id_range(query, args, where);
+ if not ok then return ok, err; end
if query.limit then
args[#args+1] = query.limit;
@@ -361,7 +470,8 @@ function archive_store:find(username, query)
and "DESC" or "ASC", query.limit and " LIMIT ?" or "");
return engine:select(sql_query, unpack(args));
end);
- if not ok then return ok, result end
+ if not ok then return ok, result; end
+ if not result then return nil, err; end
return function()
local row = result();
if row ~= nil then
@@ -372,6 +482,95 @@ function archive_store:find(username, query)
end, total;
end
+function archive_store:get(username, key)
+ local iter, err = self:find(username, { key = key })
+ if not iter then return iter, err; end
+ for _, stanza, when, with in iter do
+ return stanza, when, with;
+ end
+ return nil, "item-not-found";
+end
+
+function archive_store:set(username, key, new_value, new_when, new_with)
+ local user,store = username,self.store;
+ local ok, result = engine:transaction(function ()
+
+ local update_query = [[
+ UPDATE "prosodyarchive"
+ SET %s
+ WHERE %s
+ ]];
+ local args = { host, user or "", store, key };
+ local setf = {};
+ local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", "\"key\" = ?"};
+
+ if new_value then
+ table.insert(setf, '"type" = ?')
+ table.insert(setf, '"value" = ?')
+ local t, value = serialize(new_value);
+ table.insert(args, 1, t);
+ table.insert(args, 2, value);
+ end
+
+ if new_when then
+ table.insert(setf, 1, '"when" = ?')
+ table.insert(args, 1, new_when);
+ end
+
+ if new_with then
+ table.insert(setf, 1, '"with" = ?')
+ table.insert(args, 1, new_with);
+ end
+
+ update_query = update_query:format(t_concat(setf, ", "), t_concat(where, " AND "));
+ return engine:update(update_query, unpack(args));
+ end);
+ if not ok then return ok, result; end
+ return result:affected() == 1;
+end
+
+function archive_store:summary(username, query)
+ query = query or {};
+ local user,store = username,self.store;
+ local ok, result = engine:transaction(function()
+ local sql_query = [[
+ SELECT DISTINCT "with", COUNT(*), MIN("when"), MAX("when")
+ FROM "prosodyarchive"
+ WHERE %s
+ GROUP BY "with"
+ ORDER BY "sort_id" %s%s;
+ ]];
+ local args = { host, user or "", store, };
+ local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
+
+ archive_where(query, args, where);
+
+ archive_where_id_range(query, args, where);
+
+ if query.limit then
+ args[#args+1] = query.limit;
+ end
+
+ sql_query = sql_query:format(t_concat(where, " AND "), query.reverse
+ and "DESC" or "ASC", query.limit and " LIMIT ?" or "");
+ return engine:select(sql_query, unpack(args));
+ end);
+ if not ok then return ok, result end
+ local counts = {};
+ local earliest, latest = {}, {};
+ for row in result do
+ local with, count = row[1], row[2];
+ counts[with] = count;
+ earliest[with] = row[3];
+ latest[with] = row[4];
+ end
+ return {
+ counts = counts;
+ earliest = earliest;
+ latest = latest;
+ };
+end
+
function archive_store:delete(username, query)
query = query or {};
local user,store = username,self.store;
@@ -384,7 +583,8 @@ function archive_store:delete(username, query)
table.remove(where, 2);
end
archive_where(query, args, where);
- archive_where_id_range(query, args, where);
+ local ok, err = archive_where_id_range(query, args, where);
+ if not ok then return ok, err; end
if query.truncate == nil then
sql_query = sql_query:format(t_concat(where, " AND "));
else
@@ -423,9 +623,24 @@ function archive_store:delete(username, query)
end
return engine:delete(sql_query, unpack(args));
end);
+ local cache_key = jid_join(username, host, self.store);
+ archive_item_count_cache:set(cache_key, nil);
return ok and stmt:affected(), stmt;
end
+function archive_store:users()
+ local ok, result = engine:transaction(function()
+ local select_sql = [[
+ SELECT DISTINCT "user"
+ FROM "prosodyarchive"
+ WHERE "host"=? AND "store"=?;
+ ]];
+ return engine:select(select_sql, host, self.store);
+ end);
+ if not ok then error(result); end
+ return iterator(result);
+end
+
local stores = {
keyval = keyval_store;
map = map_store;
@@ -610,9 +825,10 @@ function module.load()
if prosody.prosodyctl then return; end
local engines = module:shared("/*/sql/connections");
local params = normalize_params(module:get_option("sql", default_params));
- engine = engines[sql.db2uri(params)];
+ local db_uri = sql.db2uri(params);
+ engine = engines[db_uri];
if not engine then
- module:log("debug", "Creating new engine");
+ module:log("debug", "Creating new engine %s", db_uri);
engine = sql:create_engine(params, function (engine) -- luacheck: ignore 431/engine
if module:get_option("sql_manage_tables", true) then
-- Automatically create table, ignore failure (table probably already exists)