diff options
Diffstat (limited to 'plugins/mod_storage_sql.lua')
-rw-r--r-- | plugins/mod_storage_sql.lua | 251 |
1 files changed, 189 insertions, 62 deletions
diff --git a/plugins/mod_storage_sql.lua b/plugins/mod_storage_sql.lua index b3ed7638..f5569f7c 100644 --- a/plugins/mod_storage_sql.lua +++ b/plugins/mod_storage_sql.lua @@ -1,19 +1,34 @@ -- luacheck: ignore 212/self -local cache = require "util.cache"; -local json = require "util.json"; -local sql = require "util.sql"; -local xml_parse = require "util.xml".parse; -local uuid = require "util.uuid"; -local resolve_relative_path = require "util.paths".resolve_relative_path; -local jid_join = require "util.jid".join; - -local is_stanza = require"util.stanza".is_stanza; +local cache = require "prosody.util.cache"; +local json = require "prosody.util.json"; +local xml_parse = require "prosody.util.xml".parse; +local uuid = require "prosody.util.uuid"; +local resolve_relative_path = require "prosody.util.paths".resolve_relative_path; +local jid_join = require "prosody.util.jid".join; + +local is_stanza = require"prosody.util.stanza".is_stanza; local t_concat = table.concat; +local have_dbisql, dbisql = pcall(require, "prosody.util.sql"); +local have_sqlite, sqlite = pcall(require, "prosody.util.sqlite3"); +if not have_dbisql then + module:log("debug", "Could not load LuaDBI, error was: %s", dbisql) + dbisql = nil; +end +if not have_sqlite then + module:log("debug", "Could not load LuaSQLite3, error was: %s", sqlite) + sqlite = nil; +end +if not (have_dbisql or have_sqlite) then + module:log("error", "LuaDBI or LuaSQLite3 are required for using SQL databases but neither are installed"); + module:log("error", "Please install at least one of LuaDBI and LuaSQLite3. See https://prosody.im/doc/depends"); + error("No SQL library available") +end + local noop = function() end -local unpack = table.unpack or unpack; -- luacheck: ignore 113 +local unpack = table.unpack; local function iterator(result) return function(result_) local row = result_(); @@ -59,9 +74,8 @@ local function deserialize(t, value) end local host = module.host; -local user, store; -local function keyval_store_get() +local function keyval_store_get(user, store) local haveany; local result = {}; local select_sql = [[ @@ -86,7 +100,7 @@ local function keyval_store_get() return result; end end -local function keyval_store_set(data) +local function keyval_store_set(data, user, store) local delete_sql = [[ DELETE FROM "prosody" WHERE "host"=? AND "user"=? AND "store"=? @@ -121,19 +135,15 @@ end local keyval_store = {}; keyval_store.__index = keyval_store; function keyval_store:get(username) - user, store = username, self.store; - local ok, result = engine:transaction(keyval_store_get); + local ok, result = engine:transaction(keyval_store_get, username, self.store); if not ok then - module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result); + module:log("error", "Unable to read from database %s store for %s: %s", self.store, username or "<host>", result); return nil, result; end return result; end function keyval_store:set(username, data) - user,store = username,self.store; - return engine:transaction(function() - return keyval_store_set(data); - end); + return engine:transaction(keyval_store_set, data, username, self.store); end function keyval_store:users() local ok, result = engine:transaction(function() @@ -150,8 +160,8 @@ end --- Archive store API -local archive_item_limit = module:get_option_number("storage_archive_item_limit"); -local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); +local archive_item_limit = module:get_option_integer("storage_archive_item_limit", nil, 0); +local archive_item_count_cache = cache.new(module:get_option_integer("storage_archive_item_limit_cache_size", 1000, 1)); local item_count_cache_hit = module:measure("item_count_cache_hit", "rate"); local item_count_cache_miss = module:measure("item_count_cache_miss", "rate") @@ -201,6 +211,13 @@ function map_store:set_keys(username, keydatas) ("host","user","store","key","type","value") VALUES (?,?,?,?,?,?); ]]; + local upsert_sql = [[ + INSERT INTO "prosody" + ("host","user","store","key","type","value") + VALUES (?,?,?,?,?,?) + ON CONFLICT ("host", "user","store", "key") + DO UPDATE SET "type"=?, "value"=?; + ]]; local select_extradata_sql = [[ SELECT "type", "value" FROM "prosody" @@ -208,7 +225,10 @@ function map_store:set_keys(username, keydatas) LIMIT 1; ]]; for key, data in pairs(keydatas) do - if type(key) == "string" and key ~= "" then + if type(key) == "string" and key ~= "" and engine.params.driver ~= "MySQL" and data ~= self.remove then + local t, value = assert(serialize(data)); + engine:insert(upsert_sql, host, username or "", self.store, key, t, value, t, value); + elseif type(key) == "string" and key ~= "" then engine:delete(delete_sql, host, username or "", self.store, key); if data ~= self.remove then @@ -291,37 +311,43 @@ function archive_store:append(username, key, value, when, with) local user,store = username,self.store; local cache_key = jid_join(username, host, store); local item_count = archive_item_count_cache:get(cache_key); - if not item_count then - item_count_cache_miss(); - local ok, ret = engine:transaction(function() - local count_sql = [[ - SELECT COUNT(*) FROM "prosodyarchive" - WHERE "host"=? AND "user"=? AND "store"=?; - ]]; - local result = engine:select(count_sql, host, user, store); - if result then - for row in result do - item_count = row[1]; + + if archive_item_limit then + if not item_count then + item_count_cache_miss(); + local ok, ret = engine:transaction(function() + local count_sql = [[ + SELECT COUNT(*) FROM "prosodyarchive" + WHERE "host"=? AND "user"=? AND "store"=?; + ]]; + local result = engine:select(count_sql, host, user, store); + if result then + for row in result do + item_count = row[1]; + end end + end); + if not ok or not item_count then + module:log("error", "Failed while checking quota for %s: %s", username, ret); + return nil, "Failure while checking quota"; end - end); - if not ok or not item_count then - module:log("error", "Failed while checking quota for %s: %s", username, ret); - return nil, "Failure while checking quota"; + archive_item_count_cache:set(cache_key, item_count); + else + item_count_cache_hit(); end - archive_item_count_cache:set(cache_key, item_count); - else - item_count_cache_hit(); - end - if archive_item_limit then module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit); if item_count >= archive_item_limit then return nil, "quota-limit"; end end + -- FIXME update the schema to allow precision timestamps when = when or os.time(); + if engine.params.driver ~= "SQLite3" then + -- SQLite3 doesn't enforce types :) + when = math.floor(when); + end with = with or ""; local ok, ret = engine:transaction(function() local delete_sql = [[ @@ -334,8 +360,9 @@ function archive_store:append(username, key, value, when, with) VALUES (?,?,?,?,?,?,?,?); ]]; if key then + -- TODO use UPSERT like map store local result = engine:delete(delete_sql, host, user or "", store, key); - if result then + if result and item_count then item_count = item_count - result:affected(); end else @@ -343,7 +370,9 @@ function archive_store:append(username, key, value, when, with) end local t, encoded_value = assert(serialize(value)); engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value); - archive_item_count_cache:set(cache_key, item_count+1); + if item_count then + archive_item_count_cache:set(cache_key, item_count+1); + end return key; end); if not ok then return ok, ret; end @@ -354,12 +383,12 @@ end local function archive_where(query, args, where) -- Time range, inclusive if query.start then - args[#args+1] = query.start + args[#args+1] = math.floor(query.start); where[#where+1] = "\"when\" >= ?" end if query["end"] then - args[#args+1] = query["end"]; + args[#args+1] = math.floor(query["end"]); if query.start then where[#where] = "\"when\" BETWEEN ? AND ?" -- is this inclusive? else @@ -382,8 +411,7 @@ local function archive_where(query, args, where) -- Set of ids if query.ids then local nids, nargs = #query.ids, #args; - -- COMPAT Lua 5.1: No separator argument to string.rep - where[#where + 1] = "\"key\" IN (" .. string.rep("?,", nids):sub(1,-2) .. ")"; + where[#where + 1] = "\"key\" IN (" .. string.rep("?", nids, ",") .. ")"; for i, id in ipairs(query.ids) do args[nargs+i] = id; end @@ -611,7 +639,7 @@ function archive_store:delete(username, query) LIMIT %s OFFSET ? );]]; if engine.params.driver == "SQLite3" then - if engine._have_delete_limit then + if engine.sqlite_compile_options.enable_update_delete_limit then sql_query = [[ DELETE FROM "prosodyarchive" WHERE %s @@ -630,7 +658,13 @@ function archive_store:delete(username, query) archive_item_count_cache:clear(); else local cache_key = jid_join(username, host, self.store); - archive_item_count_cache:set(cache_key, nil); + if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil and query.ids == nil and query.truncate == nil then + -- All items deleted, count should be zero. + archive_item_count_cache:set(cache_key, 0); + else + -- Not sure how many items left + archive_item_count_cache:set(cache_key, nil); + end end return ok and stmt:affected(), stmt; end @@ -648,10 +682,27 @@ function archive_store:users() return iterator(result); end +local keyvalplus = { + __index = { + -- keyval + get = keyval_store.get; + set = keyval_store.set; + items = keyval_store.users; + -- map + get_key = map_store.get; + set_key = map_store.set; + remove = map_store.remove; + set_keys = map_store.set_keys; + get_key_from_all = map_store.get_all; + delete_key_from_all = map_store.delete_all; + }; +} + local stores = { keyval = keyval_store; map = map_store; archive = archive_store; + ["keyval+"] = keyvalplus; }; --- Implement storage driver API @@ -692,6 +743,7 @@ end local function create_table(engine) -- luacheck: ignore 431/engine + local sql = engine.params.driver == "SQLite3" and sqlite or dbisql; local Table, Column, Index = sql.Table, sql.Column, sql.Index; local ProsodyTable = Table { @@ -702,7 +754,7 @@ local function create_table(engine) -- luacheck: ignore 431/engine Column { name="key", type="TEXT", nullable=false }; Column { name="type", type="TEXT", nullable=false }; Column { name="value", type="MEDIUMTEXT", nullable=false }; - Index { name="prosody_index", "host", "user", "store", "key" }; + Index { name = "prosody_unique_index"; unique = engine.params.driver ~= "MySQL"; "host"; "user"; "store"; "key" }; }; engine:transaction(function() ProsodyTable:create(engine); @@ -732,6 +784,7 @@ end local function upgrade_table(engine, params, apply_changes) -- luacheck: ignore 431/engine local changes = false; if params.driver == "MySQL" then + local sql = dbisql; local success,err = engine:transaction(function() do local result = assert(engine:execute("SHOW COLUMNS FROM \"prosody\" WHERE \"Field\"='value' and \"Type\"='text'")); @@ -799,12 +852,38 @@ local function upgrade_table(engine, params, apply_changes) -- luacheck: ignore success,err = engine:transaction(function() return engine:execute(check_encoding_query, params.database, engine.charset, engine.charset.."_bin"); - end); - if not success then - module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); - return false; + end); + if not success then + module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); + return false; + end + else + local indices = {}; + engine:transaction(function () + if params.driver == "SQLite3" then + for row in engine:select [[SELECT "name" FROM "sqlite_schema" WHERE "type"='index' AND "tbl_name"='prosody' AND "name"='prosody_index';]] do + indices[row[1]] = true; + end + elseif params.driver == "PostgreSQL" then + for row in engine:select [[SELECT "indexname" FROM "pg_indexes" WHERE "tablename"='prosody' AND "indexname"='prosody_index';]] do + indices[row[1]] = true; + end + end + end) + if indices["prosody_index"] then + if apply_changes then + local success = engine:transaction(function () + return assert(engine:execute([[DROP INDEX "prosody_index";]])); + end); + if not success then + module:log("error", "Failed to delete obsolete index \"prosody_index\""); + return false; + end + else + changes = true; + end + end end - end return changes; end @@ -831,12 +910,13 @@ end function module.load() local engines = module:shared("/*/sql/connections"); local params = normalize_params(module:get_option("sql", default_params)); + local sql = params.driver == "SQLite3" and sqlite or dbisql; local db_uri = sql.db2uri(params); engine = engines[db_uri]; if not engine then module:log("debug", "Creating new engine %s", db_uri); engine = sql:create_engine(params, function (engine) -- luacheck: ignore 431/engine - if module:get_option("sql_manage_tables", true) then + if module:get_option_boolean("sql_manage_tables", true) then -- Automatically create table, ignore failure (table probably already exists) -- FIXME: we should check in information_schema, etc. create_table(engine); @@ -847,28 +927,74 @@ function module.load() end end if engine.params.driver == "SQLite3" then + local compile_options = {} for row in engine:select("PRAGMA compile_options") do - if row[1] == "ENABLE_UPDATE_DELETE_LIMIT" then - engine._have_delete_limit = true; + local option = row[1]:lower(); + local opt, val = option:match("^([^=]+)=(.*)$"); + compile_options[opt or option] = tonumber(val) or val or true; + end + engine.sqlite_compile_options = compile_options; + + local journal_mode = "delete"; + for row in engine:select[[PRAGMA journal_mode;]] do + journal_mode = row[1]; + end + + -- Note: These things can't be changed with in a transaction. LuaDBI + -- opens a transaction automatically for every statement(?), so this + -- will not work there. + local tune = module:get_option_enum("sqlite_tune", "default", "normal", "fast", "safe"); + if tune == "normal" then + if journal_mode ~= "wal" then + engine:execute("PRAGMA journal_mode=WAL;"); + end + engine:execute("PRAGMA auto_vacuum=FULL;"); + engine:execute("PRAGMA synchronous=NORMAL;") + elseif tune == "fast" then + if journal_mode ~= "wal" then + engine:execute("PRAGMA journal_mode=WAL;"); end + if compile_options.secure_delete then + engine:execute("PRAGMA secure_delete=FAST;"); + end + engine:execute("PRAGMA synchronous=OFF;") + engine:execute("PRAGMA fullfsync=0;") + elseif tune == "safe" then + if journal_mode ~= "delete" then + engine:execute("PRAGMA journal_mode=DELETE;"); + end + engine:execute("PRAGMA synchronous=EXTRA;") + engine:execute("PRAGMA fullfsync=1;") + end + + for row in engine:select[[PRAGMA journal_mode;]] do + journal_mode = row[1]; end + + module:log("debug", "SQLite3 database %q operating with journal_mode=%s", engine.params.database, journal_mode); end + module:set_status("info", "Connected to " .. engine.params.driver); + end, function (engine) -- luacheck: ignore 431/engine + module:set_status("error", "Disconnected from " .. engine.params.driver); end); engines[sql.db2uri(params)] = engine; + else + module:set_status("info", "Using existing engine"); end module:provides("storage", driver); end function module.command(arg) - local config = require "core.configmanager"; - local hi = require "util.human.io"; + local config = require "prosody.core.configmanager"; + local hi = require "prosody.util.human.io"; local command = table.remove(arg, 1); if command == "upgrade" then -- We need to find every unique dburi in the config local uris = {}; for host in pairs(prosody.hosts) do -- luacheck: ignore 431/host local params = normalize_params(config.get(host, "sql") or default_params); + local sql = engine.params.driver == "SQLite3" and sqlite or dbisql; uris[sql.db2uri(params)] = params; end print("We will check and upgrade the following databases:\n"); @@ -884,6 +1010,7 @@ function module.command(arg) -- Upgrade each one for _, params in pairs(uris) do print("Checking "..params.database.."..."); + local sql = params.driver == "SQLite3" and sqlite or dbisql; engine = sql:create_engine(params); upgrade_table(engine, params, true); end |