diff options
Diffstat (limited to 'plugins/mod_storage_sql.lua')
-rw-r--r-- | plugins/mod_storage_sql.lua | 161 |
1 files changed, 129 insertions, 32 deletions
diff --git a/plugins/mod_storage_sql.lua b/plugins/mod_storage_sql.lua index a449091e..5da5e448 100644 --- a/plugins/mod_storage_sql.lua +++ b/plugins/mod_storage_sql.lua @@ -1,17 +1,19 @@ -- luacheck: ignore 212/self +local cache = require "util.cache"; local json = require "util.json"; local sql = require "util.sql"; local xml_parse = require "util.xml".parse; local uuid = require "util.uuid"; local resolve_relative_path = require "util.paths".resolve_relative_path; +local jid_join = require "util.jid".join; local is_stanza = require"util.stanza".is_stanza; local t_concat = table.concat; local noop = function() end -local unpack = table.unpack or unpack; +local unpack = table.unpack or unpack; -- luacheck: ignore 113 local function iterator(result) return function(result_) local row = result_(); @@ -148,6 +150,9 @@ end --- Archive store API +local archive_item_limit = module:get_option_number("storage_archive_item_limit"); +local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); + -- luacheck: ignore 512 431/user 431/store local map_store = {}; map_store.__index = map_store; @@ -228,10 +233,41 @@ end local archive_store = {} archive_store.caps = { total = true; + quota = archive_item_limit; + truncate = true; }; archive_store.__index = archive_store function archive_store:append(username, key, value, when, with) local user,store = username,self.store; + local cache_key = jid_join(username, host, store); + local item_count = archive_item_count_cache:get(cache_key); + if not item_count then + local ok, ret = engine:transaction(function() + local count_sql = [[ + SELECT COUNT(*) FROM "prosodyarchive" + WHERE "host"=? AND "user"=? AND "store"=?; + ]]; + local result = engine:select(count_sql, host, user, store); + if result then + for row in result do + item_count = row[1]; + end + end + end); + if not ok or not item_count then + module:log("error", "Failed while checking quota for %s: %s", username, ret); + return nil, "Failure while checking quota"; + end + archive_item_count_cache:set(cache_key, item_count); + end + + if archive_item_limit then + module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit); + if item_count >= archive_item_limit then + return nil, "quota-limit"; + end + end + when = when or os.time(); with = with or ""; local ok, ret = engine:transaction(function() @@ -245,12 +281,16 @@ function archive_store:append(username, key, value, when, with) VALUES (?,?,?,?,?,?,?,?); ]]; if key then - engine:delete(delete_sql, host, user or "", store, key); + local result, err = engine:delete(delete_sql, host, user or "", store, key); + if result then + item_count = item_count - result:affected(); + end else key = uuid.generate(); end local t, encoded_value = assert(serialize(value)); engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value); + archive_item_count_cache:set(cache_key, item_count+1); return key; end); if not ok then return ok, ret; end @@ -287,44 +327,46 @@ local function archive_where(query, args, where) end end local function archive_where_id_range(query, args, where) - local args_len = #args -- Before or after specific item, exclusive + local id_lookup_sql = [[ + SELECT "sort_id" + FROM "prosodyarchive" + WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? + LIMIT 1; + ]]; if query.after then -- keys better be unique! - where[#where+1] = [[ - "sort_id" > COALESCE( - ( - SELECT "sort_id" - FROM "prosodyarchive" - WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? - LIMIT 1 - ), 0) - ]]; - args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; - args_len = args_len + 4 + local after_id = nil; + for row in engine:select(id_lookup_sql, query.after, host, user or "", store) do + after_id = row[1]; + end + if not after_id then + return nil, "item-not-found"; + end + where[#where+1] = '"sort_id" > ?'; + args[#args+1] = after_id; end if query.before then - where[#where+1] = [[ - "sort_id" < COALESCE( - ( - SELECT "sort_id" - FROM "prosodyarchive" - WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? - LIMIT 1 - ), - ( - SELECT MAX("sort_id")+1 - FROM "prosodyarchive" - ) - ) - ]] - args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; + local before_id = nil; + for row in engine:select(id_lookup_sql, query.after, host, user or "", store) do + before_id = row[1]; + end + if not before_id then + return nil, "item-not-found"; + end + where[#where+1] = '"sort_id" < ?'; + args[#args+1] = before_id; end + return true; end function archive_store:find(username, query) query = query or {}; local user,store = username,self.store; - local total; + local cache_key = jid_join(username, host, self.store); + local total = archive_item_count_cache:get(cache_key); + if total ~= nil and query.limit == 0 and query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then + return noop, total; + end local ok, result = engine:transaction(function() local sql_query = [[ SELECT "key", "type", "value", "when", "with" @@ -346,12 +388,16 @@ function archive_store:find(username, query) total = row[1]; end end + if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then + archive_item_count_cache:set(cache_key, total); + end if query.limit == 0 then -- Skip the real query return noop, total; end end - archive_where_id_range(query, args, where); + local ok, err = archive_where_id_range(query, args, where); + if not ok then return ok, err; end if query.limit then args[#args+1] = query.limit; @@ -372,6 +418,41 @@ function archive_store:find(username, query) end, total; end +function archive_store:summary(username, query) + query = query or {}; + local user,store = username,self.store; + local ok, result = engine:transaction(function() + local sql_query = [[ + SELECT DISTINCT "with", COUNT(*) + FROM "prosodyarchive" + WHERE %s + GROUP BY "with" + ORDER BY "sort_id" %s%s; + ]]; + local args = { host, user or "", store, }; + local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", }; + + archive_where(query, args, where); + + archive_where_id_range(query, args, where); + + if query.limit then + args[#args+1] = query.limit; + end + + sql_query = sql_query:format(t_concat(where, " AND "), query.reverse + and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); + return engine:select(sql_query, unpack(args)); + end); + if not ok then return ok, result end + local summary = {}; + for row in result do + local with, count = row[1], row[2]; + summary[with] = count; + end + return summary; +end + function archive_store:delete(username, query) query = query or {}; local user,store = username,self.store; @@ -384,7 +465,8 @@ function archive_store:delete(username, query) table.remove(where, 2); end archive_where(query, args, where); - archive_where_id_range(query, args, where); + local ok, err = archive_where_id_range(query, args, where); + if not ok then return ok, err; end if query.truncate == nil then sql_query = sql_query:format(t_concat(where, " AND ")); else @@ -423,9 +505,24 @@ function archive_store:delete(username, query) end return engine:delete(sql_query, unpack(args)); end); + local cache_key = jid_join(username, host, self.store); + archive_item_count_cache:set(cache_key, nil); return ok and stmt:affected(), stmt; end +function archive_store:users() + local ok, result = engine:transaction(function() + local select_sql = [[ + SELECT DISTINCT "user" + FROM "prosodyarchive" + WHERE "host"=? AND "store"=?; + ]]; + return engine:select(select_sql, host, self.store); + end); + if not ok then error(result); end + return iterator(result); +end + local stores = { keyval = keyval_store; map = map_store; |