From 7989db63aea8f1bd467dad93c6aad20e644a4cc0 Mon Sep 17 00:00:00 2001
From: Kim Alvefur <zash@zash.se>
Date: Fri, 22 Mar 2019 17:32:56 +0100
Subject: mod_mam: Perform message expiry based on building an index by date
 (backport of 39ee70fbb009 from trunk)

For each day, store a set of users that have new messages. To expire
messages, we collect the union of sets of users from dates that fall
outside the cleanup range.

The previous algoritm did not work well with many users, especially with
the default settings.
---
 plugins/mod_mam/mod_mam.lua | 74 ++++++++++++++++++++++++++-------------------
 1 file changed, 43 insertions(+), 31 deletions(-)

(limited to 'plugins')

diff --git a/plugins/mod_mam/mod_mam.lua b/plugins/mod_mam/mod_mam.lua
index 94bedbb1..a8c2689d 100644
--- a/plugins/mod_mam/mod_mam.lua
+++ b/plugins/mod_mam/mod_mam.lua
@@ -33,7 +33,7 @@ local is_stanza = st.is_stanza;
 local tostring = tostring;
 local time_now = os.time;
 local m_min = math.min;
-local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse;
+local timestamp, timestamp_parse, datestamp = import( "util.datetime", "datetime", "parse", "date");
 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
 local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" });
 
@@ -46,13 +46,8 @@ if not archive.find then
 end
 local use_total = module:get_option_boolean("mam_include_total", true);
 
-local cleanup;
-
-local function schedule_cleanup(username)
-	if cleanup and not cleanup[username] then
-		table.insert(cleanup, username);
-		cleanup[username] = true;
-	end
+function schedule_cleanup()
+	-- replaced by non-noop later if cleanup is enabled
 end
 
 -- Handle prefs.
@@ -96,7 +91,6 @@ module:hook("iq-set/self/"..xmlns_mam..":query", function(event)
 	local qid = query.attr.queryid;
 
 	get_prefs(origin.username, true);
-	schedule_cleanup(origin.username);
 
 	-- Search query parameters
 	local qwith, qstart, qend;
@@ -212,6 +206,7 @@ end
 local function shall_store(user, who)
 	-- TODO Cache this?
 	if not um.user_exists(user, host) then
+		module:log("debug", "%s@%s does not exist", user, host)
 		return false;
 	end
 	local prefs = get_prefs(user);
@@ -329,6 +324,9 @@ module:hook("pre-message/full", strip_stanza_id_after_other_events, -1);
 local cleanup_after = module:get_option_string("archive_expires_after", "1w");
 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60);
 if cleanup_after ~= "never" then
+	local cleanup_storage = module:open_store("archive_cleanup");
+	local cleanup_map = module:open_store("archive_cleanup", "map");
+
 	local day = 86400;
 	local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day };
 	local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)");
@@ -346,33 +344,47 @@ if cleanup_after ~= "never" then
 		return false;
 	end
 
-	-- Set of known users to do message expiry for
-	-- Populated either below or when new messages are added
-	cleanup = {};
+	-- For each day, store a set of users that have new messages. To expire
+	-- messages, we collect the union of sets of users from dates that fall
+	-- outside the cleanup range.
 
-	-- Iterating over users is not supported by all authentication modules
-	-- Catch and ignore error if not supported
-	pcall(function ()
-		-- If this works, then we schedule cleanup for all known users on startup
-		for user in um.users(module.host) do
-			schedule_cleanup(user);
-		end
-	end);
+	function schedule_cleanup(username, date)
+		cleanup_map:set(date or datestamp(), username, true);
+	end
 
-	-- At odd intervals, delete old messages for one user
-	module:add_timer(math.random(10, 60), function()
-		local user = table.remove(cleanup, 1);
-		if user then
-			module:log("debug", "Removing old messages for user %q", user);
+	cleanup_runner = require "util.async".runner(function ()
+		local users = {};
+		local cut_off = datestamp(os.time() - cleanup_after);
+		for date in cleanup_storage:users() do
+			if date <= cut_off then
+				module:log("debug", "Messages from %q should be expired", date);
+				local messages_this_day = cleanup_storage:get(date);
+				if messages_this_day then
+					for user in pairs(messages_this_day) do
+						users[user] = true;
+					end
+					if date < cut_off then
+						-- Messages from the same day as the cut-off might not have expired yet,
+						-- but all earlier will have, so clear storage for those days.
+						cleanup_storage:set(date, nil);
+					end
+				end
+			end
+		end
+		local sum, num_users = 0, 0;
+		for user in pairs(users) do
 			local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
-			if not ok then
-				module:log("warn", "Could not expire archives for user %s: %s", user, err);
-			elseif type(ok) == "number" then
-				module:log("debug", "Removed %d messages", ok);
+			if ok then
+				num_users = num_users + 1;
+				sum = sum + (tonumber(ok) or 0);
 			end
-			cleanup[user] = nil;
 		end
-		return math.random(cleanup_interval, cleanup_interval * 2);
+		module:log("info", "Deleted %d expired messages for %d users", sum, num_users);
+	end);
+
+	cleanup_task = module:add_timer(1, function ()
+		cleanup_runner:run(true);
+		return cleanup_interval;
 	end);
 else
 	module:log("debug", "Archive expiry disabled");
-- 
cgit v1.2.3


From c03d54fd935333a3a486fad540a9b0f3439dfae7 Mon Sep 17 00:00:00 2001
From: Kim Alvefur <zash@zash.se>
Date: Fri, 22 Mar 2019 17:40:07 +0100
Subject: mod_muc_mam: Copy cleanup mechanism from mod_mam (fixes #672)

---
 plugins/mod_muc_mam.lua | 79 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 78 insertions(+), 1 deletion(-)

(limited to 'plugins')

diff --git a/plugins/mod_muc_mam.lua b/plugins/mod_muc_mam.lua
index ede4e57a..1fbc8b2a 100644
--- a/plugins/mod_muc_mam.lua
+++ b/plugins/mod_muc_mam.lua
@@ -29,7 +29,7 @@ local is_stanza = st.is_stanza;
 local tostring = tostring;
 local time_now = os.time;
 local m_min = math.min;
-local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse;
+local timestamp, timestamp_parse, datestamp = import( "util.datetime", "datetime", "parse", "date");
 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
 
 local default_history_length = 20;
@@ -39,6 +39,10 @@ local function get_historylength(room)
 	return math.min(room._data.history_length or default_history_length, max_history_length);
 end
 
+function schedule_cleanup()
+	-- replaced by non-noop later if cleanup is enabled
+end
+
 local log_all_rooms = module:get_option_boolean("muc_log_all_rooms", false);
 local log_by_default = module:get_option_boolean("muc_log_by_default", true);
 
@@ -351,6 +355,7 @@ local function save_to_history(self, stanza)
 	local id = archive:append(room_node, nil, stored_stanza, time_now(), with);
 
 	if id then
+		schedule_cleanup(room_node);
 		stanza:add_direct_child(st.stanza("stanza-id", { xmlns = xmlns_st_id, by = self.jid, id = id }));
 	end
 end
@@ -386,3 +391,75 @@ module:add_feature(xmlns_mam);
 module:hook("muc-disco#info", function(event)
 	event.reply:tag("feature", {var=xmlns_mam}):up();
 end);
+
+-- Cleanup
+
+local cleanup_after = module:get_option_string("muc_log_expires_after", "1w");
+local cleanup_interval = module:get_option_number("muc_log_cleanup_interval", 4 * 60 * 60);
+
+if cleanup_after ~= "never" then
+	local cleanup_storage = module:open_store("muc_log_cleanup");
+	local cleanup_map = module:open_store("muc_log_cleanup", "map");
+
+	local day = 86400;
+	local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day };
+	local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)");
+	if not n then
+		module:log("error", "Could not parse muc_log_expires_after string %q", cleanup_after);
+		return false;
+	end
+
+	cleanup_after = tonumber(n) * ( multipliers[m] or 1 );
+
+	module:log("debug", "muc_log_expires_after = %d -- in seconds", cleanup_after);
+
+	if not archive.delete then
+		module:log("error", "muc_log_expires_after set but mod_%s does not support deleting", archive._provided_by);
+		return false;
+	end
+
+	-- For each day, store a set of rooms that have new messages. To expire
+	-- messages, we collect the union of sets of rooms from dates that fall
+	-- outside the cleanup range.
+
+	function schedule_cleanup(roomname, date)
+		cleanup_map:set(date or datestamp(), roomname, true);
+	end
+
+	cleanup_runner = require "util.async".runner(function ()
+		local rooms = {};
+		local cut_off = datestamp(os.time() - cleanup_after);
+		for date in cleanup_storage:users() do
+			if date <= cut_off then
+				module:log("debug", "Messages from %q should be expired", date);
+				local messages_this_day = cleanup_storage:get(date);
+				if messages_this_day then
+					for room in pairs(messages_this_day) do
+						rooms[room] = true;
+					end
+					if date < cut_off then
+						-- Messages from the same day as the cut-off might not have expired yet,
+						-- but all earlier will have, so clear storage for those days.
+						cleanup_storage:set(date, nil);
+					end
+				end
+			end
+		end
+		local sum, num_rooms = 0, 0;
+		for room in pairs(rooms) do
+			local ok, err = archive:delete(room, { ["end"] = os.time() - cleanup_after; })
+			if ok then
+				num_rooms = num_rooms + 1;
+				sum = sum + (tonumber(ok) or 0);
+			end
+		end
+		module:log("info", "Deleted %d expired messages for %d rooms", sum, num_rooms);
+	end);
+
+	cleanup_task = module:add_timer(1, function ()
+		cleanup_runner:run(true);
+		return cleanup_interval;
+	end);
+else
+	module:log("debug", "Archive expiry disabled");
+end
-- 
cgit v1.2.3


From d0075a2784804a4197887c7f18200b6e83d0ee51 Mon Sep 17 00:00:00 2001
From: Kim Alvefur <zash@zash.se>
Date: Mon, 14 Jan 2019 00:17:02 +0100
Subject: mod_storage_memory: Implement :user iteration API

---
 plugins/mod_storage_memory.lua | 8 ++++++++
 1 file changed, 8 insertions(+)

(limited to 'plugins')

diff --git a/plugins/mod_storage_memory.lua b/plugins/mod_storage_memory.lua
index 71205ee0..745e394b 100644
--- a/plugins/mod_storage_memory.lua
+++ b/plugins/mod_storage_memory.lua
@@ -23,6 +23,10 @@ local function _purge_store(self, username)
 	return true;
 end
 
+local function _users(self)
+	return next, self.store, nil;
+end
+
 local keyval_store = {};
 keyval_store.__index = keyval_store;
 
@@ -40,9 +44,13 @@ end
 
 keyval_store.purge = _purge_store;
 
+keyval_store.users = _users;
+
 local archive_store = {};
 archive_store.__index = archive_store;
 
+archive_store.users = _users;
+
 function archive_store:append(username, key, value, when, with)
 	if is_stanza(value) then
 		value = st.preserialize(value);
-- 
cgit v1.2.3