diff options
Diffstat (limited to 'plugins/mod_storage_sql.lua')
-rw-r--r-- | plugins/mod_storage_sql.lua | 294 |
1 files changed, 255 insertions, 39 deletions
diff --git a/plugins/mod_storage_sql.lua b/plugins/mod_storage_sql.lua index 5b1c3603..e49b7ccd 100644 --- a/plugins/mod_storage_sql.lua +++ b/plugins/mod_storage_sql.lua @@ -1,17 +1,19 @@ -- luacheck: ignore 212/self +local cache = require "util.cache"; local json = require "util.json"; local sql = require "util.sql"; local xml_parse = require "util.xml".parse; local uuid = require "util.uuid"; local resolve_relative_path = require "util.paths".resolve_relative_path; +local jid_join = require "util.jid".join; local is_stanza = require"util.stanza".is_stanza; local t_concat = table.concat; local noop = function() end -local unpack = table.unpack or unpack; +local unpack = table.unpack or unpack; -- luacheck: ignore 113 local function iterator(result) return function(result_) local row = result_(); @@ -148,7 +150,13 @@ end --- Archive store API --- luacheck: ignore 512 431/user 431/store +local archive_item_limit = module:get_option_number("storage_archive_item_limit"); +local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); + +local item_count_cache_hit = module:measure("item_count_cache_hit", "rate"); +local item_count_cache_miss = module:measure("item_count_cache_miss", "rate") + +-- luacheck: ignore 512 431/user 431/store 431/err local map_store = {}; map_store.__index = map_store; map_store.remove = {}; @@ -225,13 +233,93 @@ function map_store:set_keys(username, keydatas) return result; end +function map_store:get_all(key) + if type(key) ~= "string" or key == "" then + return nil, "get_all only supports non-empty string keys"; + end + local ok, result = engine:transaction(function() + local query = [[ + SELECT "user", "type", "value" + FROM "prosody" + WHERE "host"=? AND "store"=? AND "key"=? + ]]; + + local data; + for row in engine:select(query, host, self.store, key) do + local key_data, err = deserialize(row[2], row[3]); + assert(key_data ~= nil, err); + if data == nil then + data = {}; + end + data[row[1]] = key_data; + end + + return data; + + end); + if not ok then return nil, result; end + return result; +end + +function map_store:delete_all(key) + if type(key) ~= "string" or key == "" then + return nil, "delete_all only supports non-empty string keys"; + end + local ok, result = engine:transaction(function() + local delete_sql = [[ + DELETE FROM "prosody" + WHERE "host"=? AND "store"=? AND "key"=?; + ]]; + engine:delete(delete_sql, host, self.store, key); + return true; + end); + if not ok then return nil, result; end + return result; +end + local archive_store = {} archive_store.caps = { total = true; + quota = archive_item_limit; + truncate = true; + full_id_range = true; + ids = true; }; archive_store.__index = archive_store function archive_store:append(username, key, value, when, with) local user,store = username,self.store; + local cache_key = jid_join(username, host, store); + local item_count = archive_item_count_cache:get(cache_key); + if not item_count then + item_count_cache_miss(); + local ok, ret = engine:transaction(function() + local count_sql = [[ + SELECT COUNT(*) FROM "prosodyarchive" + WHERE "host"=? AND "user"=? AND "store"=?; + ]]; + local result = engine:select(count_sql, host, user, store); + if result then + for row in result do + item_count = row[1]; + end + end + end); + if not ok or not item_count then + module:log("error", "Failed while checking quota for %s: %s", username, ret); + return nil, "Failure while checking quota"; + end + archive_item_count_cache:set(cache_key, item_count); + else + item_count_cache_hit(); + end + + if archive_item_limit then + module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit); + if item_count >= archive_item_limit then + return nil, "quota-limit"; + end + end + when = when or os.time(); with = with or ""; local ok, ret = engine:transaction(function() @@ -245,12 +333,16 @@ function archive_store:append(username, key, value, when, with) VALUES (?,?,?,?,?,?,?,?); ]]; if key then - engine:delete(delete_sql, host, user or "", store, key); + local result = engine:delete(delete_sql, host, user or "", store, key); + if result then + item_count = item_count - result:affected(); + end else key = uuid.generate(); end local t, encoded_value = assert(serialize(value)); engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value); + archive_item_count_cache:set(cache_key, item_count+1); return key; end); if not ok then return ok, ret; end @@ -285,47 +377,60 @@ local function archive_where(query, args, where) where[#where+1] = "\"key\" = ?"; args[#args+1] = query.key end + + -- Set of ids + if query.ids then + local nids, nargs = #query.ids, #args; + -- COMPAT Lua 5.1: No separator argument to string.rep + where[#where + 1] = "\"key\" IN (" .. string.rep("?,", nids):sub(1,-2) .. ")"; + for i, id in ipairs(query.ids) do + args[nargs+i] = id; + end + end end local function archive_where_id_range(query, args, where) - local args_len = #args -- Before or after specific item, exclusive + local id_lookup_sql = [[ + SELECT "sort_id" + FROM "prosodyarchive" + WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? + LIMIT 1; + ]]; if query.after then -- keys better be unique! - where[#where+1] = [[ - "sort_id" > COALESCE( - ( - SELECT "sort_id" - FROM "prosodyarchive" - WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? - LIMIT 1 - ), 0) - ]]; - args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; - args_len = args_len + 4 + local after_id = nil; + for row in engine:select(id_lookup_sql, query.after, args[1], args[2], args[3]) do + after_id = row[1]; + end + if not after_id then + return nil, "item-not-found"; + end + where[#where+1] = '"sort_id" > ?'; + args[#args+1] = after_id; end if query.before then - where[#where+1] = [[ - "sort_id" < COALESCE( - ( - SELECT "sort_id" - FROM "prosodyarchive" - WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ? - LIMIT 1 - ), - ( - SELECT MAX("sort_id")+1 - FROM "prosodyarchive" - ) - ) - ]] - args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; + local before_id = nil; + for row in engine:select(id_lookup_sql, query.before, args[1], args[2], args[3]) do + before_id = row[1]; + end + if not before_id then + return nil, "item-not-found"; + end + where[#where+1] = '"sort_id" < ?'; + args[#args+1] = before_id; end + return true; end function archive_store:find(username, query) query = query or {}; local user,store = username,self.store; - local total; - local ok, result = engine:transaction(function() + local cache_key = jid_join(username, host, self.store); + local total = archive_item_count_cache:get(cache_key); + (total and item_count_cache_hit or item_count_cache_miss)(); + if total ~= nil and query.limit == 0 and query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then + return noop, total; + end + local ok, result, err = engine:transaction(function() local sql_query = [[ SELECT "key", "type", "value", "when", "with" FROM "prosodyarchive" @@ -346,12 +451,16 @@ function archive_store:find(username, query) total = row[1]; end end + if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil then + archive_item_count_cache:set(cache_key, total); + end if query.limit == 0 then -- Skip the real query return noop, total; end end - archive_where_id_range(query, args, where); + local ok, err = archive_where_id_range(query, args, where); + if not ok then return ok, err; end if query.limit then args[#args+1] = query.limit; @@ -361,7 +470,8 @@ function archive_store:find(username, query) and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); return engine:select(sql_query, unpack(args)); end); - if not ok then return ok, result end + if not ok then return ok, result; end + if not result then return nil, err; end return function() local row = result(); if row ~= nil then @@ -372,6 +482,95 @@ function archive_store:find(username, query) end, total; end +function archive_store:get(username, key) + local iter, err = self:find(username, { key = key }) + if not iter then return iter, err; end + for _, stanza, when, with in iter do + return stanza, when, with; + end + return nil, "item-not-found"; +end + +function archive_store:set(username, key, new_value, new_when, new_with) + local user,store = username,self.store; + local ok, result = engine:transaction(function () + + local update_query = [[ + UPDATE "prosodyarchive" + SET %s + WHERE %s + ]]; + local args = { host, user or "", store, key }; + local setf = {}; + local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", "\"key\" = ?"}; + + if new_value then + table.insert(setf, '"type" = ?') + table.insert(setf, '"value" = ?') + local t, value = serialize(new_value); + table.insert(args, 1, t); + table.insert(args, 2, value); + end + + if new_when then + table.insert(setf, 1, '"when" = ?') + table.insert(args, 1, new_when); + end + + if new_with then + table.insert(setf, 1, '"with" = ?') + table.insert(args, 1, new_with); + end + + update_query = update_query:format(t_concat(setf, ", "), t_concat(where, " AND ")); + return engine:update(update_query, unpack(args)); + end); + if not ok then return ok, result; end + return result:affected() == 1; +end + +function archive_store:summary(username, query) + query = query or {}; + local user,store = username,self.store; + local ok, result = engine:transaction(function() + local sql_query = [[ + SELECT DISTINCT "with", COUNT(*), MIN("when"), MAX("when") + FROM "prosodyarchive" + WHERE %s + GROUP BY "with" + ORDER BY "sort_id" %s%s; + ]]; + local args = { host, user or "", store, }; + local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", }; + + archive_where(query, args, where); + + archive_where_id_range(query, args, where); + + if query.limit then + args[#args+1] = query.limit; + end + + sql_query = sql_query:format(t_concat(where, " AND "), query.reverse + and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); + return engine:select(sql_query, unpack(args)); + end); + if not ok then return ok, result end + local counts = {}; + local earliest, latest = {}, {}; + for row in result do + local with, count = row[1], row[2]; + counts[with] = count; + earliest[with] = row[3]; + latest[with] = row[4]; + end + return { + counts = counts; + earliest = earliest; + latest = latest; + }; +end + function archive_store:delete(username, query) query = query or {}; local user,store = username,self.store; @@ -384,7 +583,8 @@ function archive_store:delete(username, query) table.remove(where, 2); end archive_where(query, args, where); - archive_where_id_range(query, args, where); + local ok, err = archive_where_id_range(query, args, where); + if not ok then return ok, err; end if query.truncate == nil then sql_query = sql_query:format(t_concat(where, " AND ")); else @@ -423,9 +623,24 @@ function archive_store:delete(username, query) end return engine:delete(sql_query, unpack(args)); end); + local cache_key = jid_join(username, host, self.store); + archive_item_count_cache:set(cache_key, nil); return ok and stmt:affected(), stmt; end +function archive_store:users() + local ok, result = engine:transaction(function() + local select_sql = [[ + SELECT DISTINCT "user" + FROM "prosodyarchive" + WHERE "host"=? AND "store"=?; + ]]; + return engine:select(select_sql, host, self.store); + end); + if not ok then error(result); end + return iterator(result); +end + local stores = { keyval = keyval_store; map = map_store; @@ -610,9 +825,10 @@ function module.load() if prosody.prosodyctl then return; end local engines = module:shared("/*/sql/connections"); local params = normalize_params(module:get_option("sql", default_params)); - engine = engines[sql.db2uri(params)]; + local db_uri = sql.db2uri(params); + engine = engines[db_uri]; if not engine then - module:log("debug", "Creating new engine"); + module:log("debug", "Creating new engine %s", db_uri); engine = sql:create_engine(params, function (engine) -- luacheck: ignore 431/engine if module:get_option("sql_manage_tables", true) then -- Automatically create table, ignore failure (table probably already exists) @@ -640,7 +856,7 @@ end function module.command(arg) local config = require "core.configmanager"; - local prosodyctl = require "util.prosodyctl"; + local hi = require "util.human.io"; local command = table.remove(arg, 1); if command == "upgrade" then -- We need to find every unique dburi in the config @@ -655,7 +871,7 @@ function module.command(arg) end print(""); print("Ensure you have working backups of the above databases before continuing! "); - if not prosodyctl.show_yesno("Continue with the database upgrade? [yN]") then + if not hi.show_yesno("Continue with the database upgrade? [yN]") then print("Ok, no upgrade. But you do have backups, don't you? ...don't you?? :-)"); return; end |