aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/mod_storage_sql.lua
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/mod_storage_sql.lua')
-rw-r--r--plugins/mod_storage_sql.lua253
1 files changed, 190 insertions, 63 deletions
diff --git a/plugins/mod_storage_sql.lua b/plugins/mod_storage_sql.lua
index b3ed7638..3f606160 100644
--- a/plugins/mod_storage_sql.lua
+++ b/plugins/mod_storage_sql.lua
@@ -1,19 +1,34 @@
-- luacheck: ignore 212/self
-local cache = require "util.cache";
-local json = require "util.json";
-local sql = require "util.sql";
-local xml_parse = require "util.xml".parse;
-local uuid = require "util.uuid";
-local resolve_relative_path = require "util.paths".resolve_relative_path;
-local jid_join = require "util.jid".join;
-
-local is_stanza = require"util.stanza".is_stanza;
+local cache = require "prosody.util.cache";
+local json = require "prosody.util.json";
+local xml_parse = require "prosody.util.xml".parse;
+local uuid = require "prosody.util.uuid";
+local resolve_relative_path = require "prosody.util.paths".resolve_relative_path;
+local jid_join = require "prosody.util.jid".join;
+
+local is_stanza = require"prosody.util.stanza".is_stanza;
local t_concat = table.concat;
+local have_dbisql, dbisql = pcall(require, "prosody.util.sql");
+local have_sqlite, sqlite = pcall(require, "prosody.util.sqlite3");
+if not have_dbisql then
+ module:log("debug", "Could not load LuaDBI, error was: %s", dbisql)
+ dbisql = nil;
+end
+if not have_sqlite then
+ module:log("debug", "Could not load LuaSQLite3, error was: %s", sqlite)
+ sqlite = nil;
+end
+if not (have_dbisql or have_sqlite) then
+ module:log("error", "LuaDBI or LuaSQLite3 are required for using SQL databases but neither are installed");
+ module:log("error", "Please install at least one of LuaDBI and LuaSQLite3. See https://prosody.im/doc/depends");
+ error("No SQL library available")
+end
+
local noop = function() end
-local unpack = table.unpack or unpack; -- luacheck: ignore 113
+local unpack = table.unpack;
local function iterator(result)
return function(result_)
local row = result_();
@@ -59,9 +74,8 @@ local function deserialize(t, value)
end
local host = module.host;
-local user, store;
-local function keyval_store_get()
+local function keyval_store_get(user, store)
local haveany;
local result = {};
local select_sql = [[
@@ -86,7 +100,7 @@ local function keyval_store_get()
return result;
end
end
-local function keyval_store_set(data)
+local function keyval_store_set(data, user, store)
local delete_sql = [[
DELETE FROM "prosody"
WHERE "host"=? AND "user"=? AND "store"=?
@@ -121,19 +135,15 @@ end
local keyval_store = {};
keyval_store.__index = keyval_store;
function keyval_store:get(username)
- user, store = username, self.store;
- local ok, result = engine:transaction(keyval_store_get);
+ local ok, result = engine:transaction(keyval_store_get, username, self.store);
if not ok then
- module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result);
+ module:log("error", "Unable to read from database %s store for %s: %s", self.store, username or "<host>", result);
return nil, result;
end
return result;
end
function keyval_store:set(username, data)
- user,store = username,self.store;
- return engine:transaction(function()
- return keyval_store_set(data);
- end);
+ return engine:transaction(keyval_store_set, data, username, self.store);
end
function keyval_store:users()
local ok, result = engine:transaction(function()
@@ -150,8 +160,8 @@ end
--- Archive store API
-local archive_item_limit = module:get_option_number("storage_archive_item_limit");
-local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
+local archive_item_limit = module:get_option_integer("storage_archive_item_limit", nil, 0);
+local archive_item_count_cache = cache.new(module:get_option_integer("storage_archive_item_limit_cache_size", 1000, 1));
local item_count_cache_hit = module:measure("item_count_cache_hit", "rate");
local item_count_cache_miss = module:measure("item_count_cache_miss", "rate")
@@ -201,6 +211,13 @@ function map_store:set_keys(username, keydatas)
("host","user","store","key","type","value")
VALUES (?,?,?,?,?,?);
]];
+ local upsert_sql = [[
+ INSERT INTO "prosody"
+ ("host","user","store","key","type","value")
+ VALUES (?,?,?,?,?,?)
+ ON CONFLICT ("host", "user","store", "key")
+ DO UPDATE SET "type"=?, "value"=?;
+ ]];
local select_extradata_sql = [[
SELECT "type", "value"
FROM "prosody"
@@ -208,7 +225,10 @@ function map_store:set_keys(username, keydatas)
LIMIT 1;
]];
for key, data in pairs(keydatas) do
- if type(key) == "string" and key ~= "" then
+ if type(key) == "string" and key ~= "" and engine.params.driver ~= "MySQL" and data ~= self.remove then
+ local t, value = assert(serialize(data));
+ engine:insert(upsert_sql, host, username or "", self.store, key, t, value, t, value);
+ elseif type(key) == "string" and key ~= "" then
engine:delete(delete_sql,
host, username or "", self.store, key);
if data ~= self.remove then
@@ -291,37 +311,43 @@ function archive_store:append(username, key, value, when, with)
local user,store = username,self.store;
local cache_key = jid_join(username, host, store);
local item_count = archive_item_count_cache:get(cache_key);
- if not item_count then
- item_count_cache_miss();
- local ok, ret = engine:transaction(function()
- local count_sql = [[
- SELECT COUNT(*) FROM "prosodyarchive"
- WHERE "host"=? AND "user"=? AND "store"=?;
- ]];
- local result = engine:select(count_sql, host, user, store);
- if result then
- for row in result do
- item_count = row[1];
+
+ if archive_item_limit then
+ if not item_count then
+ item_count_cache_miss();
+ local ok, ret = engine:transaction(function()
+ local count_sql = [[
+ SELECT COUNT(*) FROM "prosodyarchive"
+ WHERE "host"=? AND "user"=? AND "store"=?;
+ ]];
+ local result = engine:select(count_sql, host, user, store);
+ if result then
+ for row in result do
+ item_count = row[1];
+ end
end
+ end);
+ if not ok or not item_count then
+ module:log("error", "Failed while checking quota for %s: %s", username, ret);
+ return nil, "Failure while checking quota";
end
- end);
- if not ok or not item_count then
- module:log("error", "Failed while checking quota for %s: %s", username, ret);
- return nil, "Failure while checking quota";
+ archive_item_count_cache:set(cache_key, item_count);
+ else
+ item_count_cache_hit();
end
- archive_item_count_cache:set(cache_key, item_count);
- else
- item_count_cache_hit();
- end
- if archive_item_limit then
module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
if item_count >= archive_item_limit then
return nil, "quota-limit";
end
end
+ -- FIXME update the schema to allow precision timestamps
when = when or os.time();
+ if engine.params.driver ~= "SQLite3" then
+ -- SQLite3 doesn't enforce types :)
+ when = math.floor(when);
+ end
with = with or "";
local ok, ret = engine:transaction(function()
local delete_sql = [[
@@ -334,16 +360,19 @@ function archive_store:append(username, key, value, when, with)
VALUES (?,?,?,?,?,?,?,?);
]];
if key then
+ -- TODO use UPSERT like map store
local result = engine:delete(delete_sql, host, user or "", store, key);
- if result then
+ if result and item_count then
item_count = item_count - result:affected();
end
else
- key = uuid.generate();
+ key = uuid.v7();
end
local t, encoded_value = assert(serialize(value));
engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
- archive_item_count_cache:set(cache_key, item_count+1);
+ if item_count then
+ archive_item_count_cache:set(cache_key, item_count+1);
+ end
return key;
end);
if not ok then return ok, ret; end
@@ -354,12 +383,12 @@ end
local function archive_where(query, args, where)
-- Time range, inclusive
if query.start then
- args[#args+1] = query.start
+ args[#args+1] = math.floor(query.start);
where[#where+1] = "\"when\" >= ?"
end
if query["end"] then
- args[#args+1] = query["end"];
+ args[#args+1] = math.floor(query["end"]);
if query.start then
where[#where] = "\"when\" BETWEEN ? AND ?" -- is this inclusive?
else
@@ -382,8 +411,7 @@ local function archive_where(query, args, where)
-- Set of ids
if query.ids then
local nids, nargs = #query.ids, #args;
- -- COMPAT Lua 5.1: No separator argument to string.rep
- where[#where + 1] = "\"key\" IN (" .. string.rep("?,", nids):sub(1,-2) .. ")";
+ where[#where + 1] = "\"key\" IN (" .. string.rep("?", nids, ",") .. ")";
for i, id in ipairs(query.ids) do
args[nargs+i] = id;
end
@@ -611,7 +639,7 @@ function archive_store:delete(username, query)
LIMIT %s OFFSET ?
);]];
if engine.params.driver == "SQLite3" then
- if engine._have_delete_limit then
+ if engine.sqlite_compile_options.enable_update_delete_limit then
sql_query = [[
DELETE FROM "prosodyarchive"
WHERE %s
@@ -630,7 +658,13 @@ function archive_store:delete(username, query)
archive_item_count_cache:clear();
else
local cache_key = jid_join(username, host, self.store);
- archive_item_count_cache:set(cache_key, nil);
+ if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil and query.ids == nil and query.truncate == nil then
+ -- All items deleted, count should be zero.
+ archive_item_count_cache:set(cache_key, 0);
+ else
+ -- Not sure how many items left
+ archive_item_count_cache:set(cache_key, nil);
+ end
end
return ok and stmt:affected(), stmt;
end
@@ -648,10 +682,27 @@ function archive_store:users()
return iterator(result);
end
+local keyvalplus = {
+ __index = {
+ -- keyval
+ get = keyval_store.get;
+ set = keyval_store.set;
+ items = keyval_store.users;
+ -- map
+ get_key = map_store.get;
+ set_key = map_store.set;
+ remove = map_store.remove;
+ set_keys = map_store.set_keys;
+ get_key_from_all = map_store.get_all;
+ delete_key_from_all = map_store.delete_all;
+ };
+}
+
local stores = {
keyval = keyval_store;
map = map_store;
archive = archive_store;
+ ["keyval+"] = keyvalplus;
};
--- Implement storage driver API
@@ -692,6 +743,7 @@ end
local function create_table(engine) -- luacheck: ignore 431/engine
+ local sql = engine.params.driver == "SQLite3" and sqlite or dbisql;
local Table, Column, Index = sql.Table, sql.Column, sql.Index;
local ProsodyTable = Table {
@@ -702,7 +754,7 @@ local function create_table(engine) -- luacheck: ignore 431/engine
Column { name="key", type="TEXT", nullable=false };
Column { name="type", type="TEXT", nullable=false };
Column { name="value", type="MEDIUMTEXT", nullable=false };
- Index { name="prosody_index", "host", "user", "store", "key" };
+ Index { name = "prosody_unique_index"; unique = engine.params.driver ~= "MySQL"; "host"; "user"; "store"; "key" };
};
engine:transaction(function()
ProsodyTable:create(engine);
@@ -732,6 +784,7 @@ end
local function upgrade_table(engine, params, apply_changes) -- luacheck: ignore 431/engine
local changes = false;
if params.driver == "MySQL" then
+ local sql = dbisql;
local success,err = engine:transaction(function()
do
local result = assert(engine:execute("SHOW COLUMNS FROM \"prosody\" WHERE \"Field\"='value' and \"Type\"='text'"));
@@ -799,12 +852,38 @@ local function upgrade_table(engine, params, apply_changes) -- luacheck: ignore
success,err = engine:transaction(function()
return engine:execute(check_encoding_query, params.database,
engine.charset, engine.charset.."_bin");
- end);
- if not success then
- module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error");
- return false;
+ end);
+ if not success then
+ module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error");
+ return false;
+ end
+ else
+ local indices = {};
+ engine:transaction(function ()
+ if params.driver == "SQLite3" then
+ for row in engine:select [[SELECT "name" FROM "sqlite_schema" WHERE "type"='index' AND "tbl_name"='prosody' AND "name"='prosody_index';]] do
+ indices[row[1]] = true;
+ end
+ elseif params.driver == "PostgreSQL" then
+ for row in engine:select [[SELECT "indexname" FROM "pg_indexes" WHERE "tablename"='prosody' AND "indexname"='prosody_index';]] do
+ indices[row[1]] = true;
+ end
+ end
+ end)
+ if indices["prosody_index"] then
+ if apply_changes then
+ local success = engine:transaction(function ()
+ return assert(engine:execute([[DROP INDEX "prosody_index";]]));
+ end);
+ if not success then
+ module:log("error", "Failed to delete obsolete index \"prosody_index\"");
+ return false;
+ end
+ else
+ changes = true;
+ end
+ end
end
- end
return changes;
end
@@ -831,12 +910,13 @@ end
function module.load()
local engines = module:shared("/*/sql/connections");
local params = normalize_params(module:get_option("sql", default_params));
+ local sql = params.driver == "SQLite3" and sqlite or dbisql;
local db_uri = sql.db2uri(params);
engine = engines[db_uri];
if not engine then
module:log("debug", "Creating new engine %s", db_uri);
engine = sql:create_engine(params, function (engine) -- luacheck: ignore 431/engine
- if module:get_option("sql_manage_tables", true) then
+ if module:get_option_boolean("sql_manage_tables", true) then
-- Automatically create table, ignore failure (table probably already exists)
-- FIXME: we should check in information_schema, etc.
create_table(engine);
@@ -847,28 +927,74 @@ function module.load()
end
end
if engine.params.driver == "SQLite3" then
+ local compile_options = {}
for row in engine:select("PRAGMA compile_options") do
- if row[1] == "ENABLE_UPDATE_DELETE_LIMIT" then
- engine._have_delete_limit = true;
+ local option = row[1]:lower();
+ local opt, val = option:match("^([^=]+)=(.*)$");
+ compile_options[opt or option] = tonumber(val) or val or true;
+ end
+ engine.sqlite_compile_options = compile_options;
+
+ local journal_mode = "delete";
+ for row in engine:select[[PRAGMA journal_mode;]] do
+ journal_mode = row[1];
+ end
+
+ -- Note: These things can't be changed with in a transaction. LuaDBI
+ -- opens a transaction automatically for every statement(?), so this
+ -- will not work there.
+ local tune = module:get_option_enum("sqlite_tune", "default", "normal", "fast", "safe");
+ if tune == "normal" then
+ if journal_mode ~= "wal" then
+ engine:execute("PRAGMA journal_mode=WAL;");
+ end
+ engine:execute("PRAGMA auto_vacuum=FULL;");
+ engine:execute("PRAGMA synchronous=NORMAL;")
+ elseif tune == "fast" then
+ if journal_mode ~= "wal" then
+ engine:execute("PRAGMA journal_mode=WAL;");
end
+ if compile_options.secure_delete then
+ engine:execute("PRAGMA secure_delete=FAST;");
+ end
+ engine:execute("PRAGMA synchronous=OFF;")
+ engine:execute("PRAGMA fullfsync=0;")
+ elseif tune == "safe" then
+ if journal_mode ~= "delete" then
+ engine:execute("PRAGMA journal_mode=DELETE;");
+ end
+ engine:execute("PRAGMA synchronous=EXTRA;")
+ engine:execute("PRAGMA fullfsync=1;")
+ end
+
+ for row in engine:select[[PRAGMA journal_mode;]] do
+ journal_mode = row[1];
end
+
+ module:log("debug", "SQLite3 database %q operating with journal_mode=%s", engine.params.database, journal_mode);
end
+ module:set_status("info", "Connected to " .. engine.params.driver);
+ end, function (engine) -- luacheck: ignore 431/engine
+ module:set_status("error", "Disconnected from " .. engine.params.driver);
end);
engines[sql.db2uri(params)] = engine;
+ else
+ module:set_status("info", "Using existing engine");
end
module:provides("storage", driver);
end
function module.command(arg)
- local config = require "core.configmanager";
- local hi = require "util.human.io";
+ local config = require "prosody.core.configmanager";
+ local hi = require "prosody.util.human.io";
local command = table.remove(arg, 1);
if command == "upgrade" then
-- We need to find every unique dburi in the config
local uris = {};
for host in pairs(prosody.hosts) do -- luacheck: ignore 431/host
local params = normalize_params(config.get(host, "sql") or default_params);
+ local sql = engine.params.driver == "SQLite3" and sqlite or dbisql;
uris[sql.db2uri(params)] = params;
end
print("We will check and upgrade the following databases:\n");
@@ -884,6 +1010,7 @@ function module.command(arg)
-- Upgrade each one
for _, params in pairs(uris) do
print("Checking "..params.database.."...");
+ local sql = params.driver == "SQLite3" and sqlite or dbisql;
engine = sql:create_engine(params);
upgrade_table(engine, params, true);
end