add storage cleaner

Introduce the ability to clean up old messages from the sqlite db.
The StoragePolicy can be chosen by the user. Currently there's
two versions, delete everything based on age is the obvious.

The other is for the data hoarders among us. It'll only delete
message types which can be considered low value... Types with
a time aspect like away / back... joins / parts etc.

It tries to do that in a sensible way, so that we don't block
all other db writers that are ongoing.
The "periodically" interval is by design not exposed to the user.
This commit is contained in:
Reto Brunner 2023-12-26 12:00:53 +01:00
commit 7f0b721790
9 changed files with 458 additions and 52 deletions

View file

@ -11,6 +11,7 @@
"../server/log.ts",
"../server/config.ts",
"../server/client.ts",
"../server/storageCleaner.ts",
"../server/clientManager.ts",
"../server/identification.ts",
"../server/plugins/changelog.ts",

View file

@ -304,6 +304,26 @@ module.exports = {
// This value is set to `["sqlite", "text"]` by default.
messageStorage: ["sqlite", "text"],
// ### `storagePolicy`
// When the sqlite storage is in use, control the maximum storage duration.
// A background task will periodically clean up messages older than the limit.
// The available keys for the `storagePolicy` object are:
//
// - `enabled`: If this is false, the cleaning task is not running.
// - `maxAgeDays`: Maximum age of an entry in days.
// - `deletionPolicy`: Controls what types of messages are being deleted.
// Valid options are:
// - `statusOnly`: Only delete message types which are status related (e.g. away, back, join, parts, mode, ctcp...)
// but keep actual messages from nicks. This keeps the DB size down while retaining "precious" messages.
// - `everything`: Delete everything, including messages from irc nicks
storagePolicy: {
enabled: false,
maxAgeDays: 7,
deletionPolicy: "statusOnly",
},
// ### `useHexIp`
//
// When set to `true`, users' IP addresses will be encoded as hex.

View file

@ -18,6 +18,7 @@ import TextFileMessageStorage from "./plugins/messageStorage/text";
import Network, {IgnoreListItem, NetworkConfig, NetworkWithIrcFramework} from "./models/network";
import ClientManager from "./clientManager";
import {MessageStorage, SearchQuery, SearchResponse} from "./plugins/messageStorage/types";
import {StorageCleaner} from "./storageCleaner";
type OrderItem = Chan["id"] | Network["uuid"];
type Order = OrderItem[];
@ -138,6 +139,15 @@ class Client {
if (!Config.values.public && client.config.log) {
if (Config.values.messageStorage.includes("sqlite")) {
client.messageProvider = new SqliteMessageStorage(client.name);
if (Config.values.storagePolicy.enabled) {
log.info(
`Activating storage cleaner. Policy: ${Config.values.storagePolicy.deletionPolicy}. MaxAge: ${Config.values.storagePolicy.maxAgeDays} days`
);
const cleaner = new StorageCleaner(client.messageProvider);
cleaner.start();
}
client.messageStorage.push(client.messageProvider);
}

View file

@ -3,6 +3,7 @@ import {Command} from "commander";
import ClientManager from "../clientManager";
import Utils from "./utils";
import SqliteMessageStorage from "../plugins/messageStorage/sqlite";
import {StorageCleaner} from "../storageCleaner";
const program = new Command("storage").description(
"various utilities related to the message storage"
@ -10,7 +11,7 @@ const program = new Command("storage").description(
program
.command("migrate")
.argument("[user]", "migrate a specific user only, all if not provided")
.argument("[username]", "migrate a specific user only, all if not provided")
.description("Migrate message storage where needed")
.on("--help", Utils.extraHelp)
.action(function (user) {
@ -20,7 +21,19 @@ program
});
});
async function runMigrations(user: string) {
program
.command("clean")
.argument("[user]", "clean messages for a specific user only, all if not provided")
.description("Delete messages from the DB based on the storage policy")
.on("--help", Utils.extraHelp)
.action(function (user) {
runCleaning(user).catch((err) => {
log.error(err.toString());
process.exit(1);
});
});
async function runMigrations(user?: string) {
const manager = new ClientManager();
const users = manager.getUsers();
@ -65,4 +78,46 @@ function isUserLogEnabled(manager: ClientManager, user: string): boolean {
return conf.log;
}
async function runCleaning(user: string) {
const manager = new ClientManager();
const users = manager.getUsers();
if (user) {
if (!users.includes(user)) {
throw new Error(`invalid user ${user}`);
}
return cleanUser(manager, user);
}
for (const name of users) {
await cleanUser(manager, name);
// if any migration fails we blow up,
// chances are the rest won't complete either
}
}
async function cleanUser(manager: ClientManager, user: string) {
log.info("handling user", user);
if (!isUserLogEnabled(manager, user)) {
log.info("logging disabled for user", user, ". Skipping");
return;
}
const sqlite = new SqliteMessageStorage(user);
await sqlite.enable();
const cleaner = new StorageCleaner(sqlite);
const num_deleted = await cleaner.runDeletesNoLimit();
log.info(`deleted ${num_deleted} messages`);
log.info("running a vacuum now, this might take a while");
if (num_deleted > 0) {
await sqlite.vacuum();
}
await sqlite.close();
log.info(`cleaning messages for ${user} has been successful`);
}
export default program;

View file

@ -76,6 +76,12 @@ type Debug = {
raw: boolean;
};
type StoragePolicy = {
enabled: boolean;
maxAgeDays: number;
deletionPolicy: "statusOnly" | "everything";
};
export type ConfigType = {
public: boolean;
host: string | undefined;
@ -97,6 +103,7 @@ export type ConfigType = {
defaults: Defaults;
lockNetwork: boolean;
messageStorage: string[];
storagePolicy: StoragePolicy;
useHexIp: boolean;
webirc?: WebIRC;
identd: Identd;

View file

@ -7,7 +7,7 @@ import Config from "../../config";
import Msg, {Message} from "../../models/msg";
import Chan, {Channel} from "../../models/chan";
import Helper from "../../helper";
import type {SearchResponse, SearchQuery, SearchableMessageStorage} from "./types";
import type {SearchResponse, SearchQuery, SearchableMessageStorage, DeletionRequest} from "./types";
import Network from "../../models/network";
// TODO; type
@ -26,7 +26,7 @@ try {
type Migration = {version: number; stmts: string[]};
type Rollback = {version: number; rollback_forbidden?: boolean; stmts: string[]};
export const currentSchemaVersion = 1679743888000; // use `new Date().getTime()`
export const currentSchemaVersion = 1703322560448; // use `new Date().getTime()`
// Desired schema, adapt to the newest version and add migrations to the array below
const schema = [
@ -45,6 +45,7 @@ const schema = [
)`,
"CREATE INDEX network_channel ON messages (network, channel)",
"CREATE INDEX time ON messages (time)",
"CREATE INDEX msg_type_idx on messages (type)", // needed for efficient storageCleaner queries
];
// the migrations will be executed in an exclusive transaction as a whole
@ -78,6 +79,10 @@ export const migrations: Migration[] = [
)`,
],
},
{
version: 1703322560448,
stmts: ["CREATE INDEX msg_type_idx on messages (type)"],
},
];
// down migrations need to restore the state of the prior version.
@ -91,6 +96,10 @@ export const rollbacks: Rollback[] = [
version: 1679743888000,
stmts: [], // here we can't drop the tables, as we use them in the code, so just leave those in
},
{
version: 1703322560448,
stmts: ["drop INDEX msg_type_idx"],
},
];
class Deferred {
@ -116,7 +125,21 @@ class SqliteMessageStorage implements SearchableMessageStorage {
this.initDone = new Deferred();
}
async _enable() {
async _enable(connection_string: string) {
this.database = new sqlite3.Database(connection_string);
try {
await this.run_pragmas(); // must be done outside of a transaction
await this.run_migrations();
} catch (e) {
this.isEnabled = false;
throw Helper.catch_to_error("Migration failed", e);
}
this.isEnabled = true;
}
async enable() {
const logsPath = Config.getUserLogsPath();
const sqlitePath = path.join(logsPath, `${this.userName}.sqlite3`);
@ -126,22 +149,8 @@ class SqliteMessageStorage implements SearchableMessageStorage {
throw Helper.catch_to_error("Unable to create logs directory", e);
}
this.isEnabled = true;
this.database = new sqlite3.Database(sqlitePath);
try {
await this.run_pragmas(); // must be done outside of a transaction
await this.run_migrations();
} catch (e) {
this.isEnabled = false;
throw Helper.catch_to_error("Migration failed", e);
}
}
async enable() {
try {
await this._enable();
await this._enable(sqlitePath);
} finally {
this.initDone.resolve(); // unblock the instance methods
}
@ -149,12 +158,13 @@ class SqliteMessageStorage implements SearchableMessageStorage {
async setup_new_db() {
for (const stmt of schema) {
await this.serialize_run(stmt, []);
await this.serialize_run(stmt);
}
await this.serialize_run("INSERT INTO options (name, value) VALUES ('schema_version', ?)", [
currentSchemaVersion.toString(),
]);
await this.serialize_run(
"INSERT INTO options (name, value) VALUES ('schema_version', ?)",
currentSchemaVersion.toString()
);
}
async current_version(): Promise<number> {
@ -181,9 +191,10 @@ class SqliteMessageStorage implements SearchableMessageStorage {
}
async update_version_in_db() {
return this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [
currentSchemaVersion.toString(),
]);
return this.serialize_run(
"UPDATE options SET value = ? WHERE name = 'schema_version'",
currentSchemaVersion.toString()
);
}
async _run_migrations(dbVersion: number) {
@ -194,14 +205,14 @@ class SqliteMessageStorage implements SearchableMessageStorage {
const to_execute = necessaryMigrations(dbVersion);
for (const stmt of to_execute.map((m) => m.stmts).flat()) {
await this.serialize_run(stmt, []);
await this.serialize_run(stmt);
}
await this.update_version_in_db();
}
async run_pragmas() {
await this.serialize_run("PRAGMA foreign_keys = ON;", []);
await this.serialize_run("PRAGMA foreign_keys = ON;");
}
async run_migrations() {
@ -213,7 +224,7 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return; // nothing to do
}
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []);
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION");
try {
if (version === 0) {
@ -224,12 +235,17 @@ class SqliteMessageStorage implements SearchableMessageStorage {
await this.insert_rollback_since(version);
} catch (err) {
await this.serialize_run("ROLLBACK", []);
await this.serialize_run("ROLLBACK");
throw err;
}
await this.serialize_run("COMMIT", []);
await this.serialize_run("VACUUM", []);
await this.serialize_run("COMMIT");
await this.serialize_run("VACUUM");
}
// helper method that vacuums the db, meant to be used by migration related cli commands
async vacuum() {
await this.serialize_run("VACUUM");
}
async close() {
@ -282,7 +298,7 @@ class SqliteMessageStorage implements SearchableMessageStorage {
}
async delete_migrations_older_than(version: number) {
return this.serialize_run("delete from migrations where migrations.version > ?", [version]);
return this.serialize_run("delete from migrations where migrations.version > ?", version);
}
async _downgrade_to(version: number) {
@ -300,7 +316,7 @@ class SqliteMessageStorage implements SearchableMessageStorage {
for (const rollback of _rollbacks) {
for (const stmt of rollback.stmts) {
await this.serialize_run(stmt, []);
await this.serialize_run(stmt);
}
}
@ -315,18 +331,18 @@ class SqliteMessageStorage implements SearchableMessageStorage {
throw Error(`${version} is not a valid version to downgrade to`);
}
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []);
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION");
let new_version: number;
try {
new_version = await this._downgrade_to(version);
} catch (err) {
await this.serialize_run("ROLLBACK", []);
await this.serialize_run("ROLLBACK");
throw err;
}
await this.serialize_run("COMMIT", []);
await this.serialize_run("COMMIT");
return new_version;
}
@ -354,7 +370,9 @@ class SqliteMessageStorage implements SearchableMessageStorage {
`insert into rollback_steps
(migration_id, step, statement)
values (?, ?, ?)`,
[migration.id, step, stmt]
migration.id,
step,
stmt
);
step++;
}
@ -381,13 +399,12 @@ class SqliteMessageStorage implements SearchableMessageStorage {
await this.serialize_run(
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)",
[
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg),
]
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg)
);
}
@ -398,10 +415,11 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return;
}
await this.serialize_run("DELETE FROM messages WHERE network = ? AND channel = ?", [
await this.serialize_run(
"DELETE FROM messages WHERE network = ? AND channel = ?",
network.uuid,
channel.name.toLowerCase(),
]);
channel.name.toLowerCase()
);
}
async getMessages(
@ -477,20 +495,47 @@ class SqliteMessageStorage implements SearchableMessageStorage {
};
}
async deleteMessages(req: DeletionRequest): Promise<number> {
await this.initDone.promise;
let sql = "delete from messages where id in (select id from messages where\n";
// We roughly get a timestamp from N days before.
// We don't adjust for daylight savings time or other weird time jumps
const millisecondsInDay = 24 * 60 * 60 * 1000;
const deleteBefore = Date.now() - req.olderThanDays * millisecondsInDay;
sql += `time <= ${deleteBefore}\n`;
let typeClause = "";
if (req.messageTypes !== null) {
typeClause = `type in (${req.messageTypes.map((type) => `'${type}'`).join(",")})\n`;
}
if (typeClause) {
sql += `and ${typeClause}`;
}
sql += "order by time asc\n";
sql += `limit ${req.limit}\n`;
sql += ")";
return this.serialize_run(sql);
}
canProvideMessages() {
return this.isEnabled;
}
private serialize_run(stmt: string, params: any[]): Promise<void> {
private serialize_run(stmt: string, ...params: any[]): Promise<number> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.run(stmt, params, (err) => {
this.database.run(stmt, params, function (err) {
if (err) {
reject(err);
return;
}
resolve();
resolve(this.changes); // number of affected rows, `this` is re-bound by sqlite3
});
});
});

View file

@ -4,6 +4,13 @@ import {Channel} from "../../models/channel";
import {Message} from "../../models/message";
import {Network} from "../../models/network";
import Client from "../../client";
import type {MessageType} from "../../models/msg";
export type DeletionRequest = {
olderThanDays: number;
messageTypes: MessageType[] | null; // null means no restriction
limit: number; // -1 means unlimited
};
interface MessageStorage {
isEnabled: boolean;

148
server/storageCleaner.ts Normal file
View file

@ -0,0 +1,148 @@
import SqliteMessageStorage from "./plugins/messageStorage/sqlite";
import {MessageType} from "./models/msg";
import Config from "./config";
import {DeletionRequest} from "./plugins/messageStorage/types";
import log from "./log";
const status_types = [
MessageType.AWAY,
MessageType.BACK,
MessageType.INVITE,
MessageType.JOIN,
MessageType.KICK,
MessageType.MODE,
MessageType.MODE_CHANNEL,
MessageType.MODE_USER,
MessageType.NICK,
MessageType.PART,
MessageType.QUIT,
MessageType.CTCP, // not technically a status, but generally those are only of interest temporarily
MessageType.CTCP_REQUEST,
MessageType.CHGHOST,
MessageType.TOPIC,
MessageType.TOPIC_SET_BY,
];
export class StorageCleaner {
db: SqliteMessageStorage;
olderThanDays: number;
messageTypes: MessageType[] | null;
limit: number;
ticker?: ReturnType<typeof setTimeout>;
errCount: number;
isStopped: boolean;
constructor(db: SqliteMessageStorage) {
this.errCount = 0;
this.isStopped = true;
this.db = db;
this.limit = 200;
const policy = Config.values.storagePolicy;
this.olderThanDays = policy.maxAgeDays;
switch (policy.deletionPolicy) {
case "statusOnly":
this.messageTypes = status_types;
break;
case "everything":
this.messageTypes = null;
break;
default:
// exhaustive switch guard, blows up when user specifies a invalid policy enum
this.messageTypes = assertNoBadPolicy(policy.deletionPolicy);
}
}
private genDeletionRequest(): DeletionRequest {
return {
limit: this.limit,
messageTypes: this.messageTypes,
olderThanDays: this.olderThanDays,
};
}
async runDeletesNoLimit(): Promise<number> {
if (!Config.values.storagePolicy.enabled) {
// this is meant to be used by cli tools, so we guard against this
throw new Error("storage policy is disabled");
}
const req = this.genDeletionRequest();
req.limit = -1; // unlimited
const num_deleted = await this.db.deleteMessages(req);
return num_deleted;
}
private async runDeletes() {
if (this.isStopped) {
return;
}
if (!this.db.isEnabled) {
// TODO: remove this once the server is intelligent enough to wait for init
this.schedule(30 * 1000);
return;
}
const req = this.genDeletionRequest();
let num_deleted = 0;
try {
num_deleted = await this.db.deleteMessages(req);
this.errCount = 0; // reset when it works
} catch (err: any) {
this.errCount++;
log.error("can't clean messages", err.message);
if (this.errCount === 2) {
log.error("Cleaning failed too many times, will not retry");
this.stop();
return;
}
}
// need to recheck here as the field may have changed since the await
if (this.isStopped) {
return;
}
if (num_deleted < req.limit) {
this.schedule(5 * 60 * 1000);
} else {
this.schedule(5000); // give others a chance to execute queries
}
}
private schedule(ms: number) {
const self = this;
this.ticker = setTimeout(() => {
self.runDeletes().catch((err) => {
log.error("storageCleaner: unexpected failure");
throw err;
});
}, ms);
}
start() {
this.isStopped = false;
this.schedule(0);
}
stop() {
this.isStopped = true;
if (!this.ticker) {
return;
}
clearTimeout(this.ticker);
}
}
function assertNoBadPolicy(_: never): never {
throw new Error(
`Invalid deletion policy "${Config.values.storagePolicy.deletionPolicy}" in the \`storagePolicy\` object, fix your config.`
);
}

View file

@ -12,6 +12,7 @@ import MessageStorage, {
rollbacks,
} from "../../server/plugins/messageStorage/sqlite";
import sqlite3 from "sqlite3";
import {DeletionRequest} from "../../server/plugins/messageStorage/types";
const orig_schema = [
// Schema version #1
@ -127,6 +128,112 @@ describe("SQLite migrations", function () {
});
});
describe("SQLite unit tests", function () {
let store: MessageStorage;
beforeEach(async function () {
store = new MessageStorage("testUser");
await store._enable(":memory:");
store.initDone.resolve();
});
afterEach(async function () {
await store.close();
});
it("deletes messages when asked to", async function () {
const baseDate = new Date();
const net = {uuid: "testnet"} as any;
const chan = {name: "#channel"} as any;
for (let i = 0; i < 14; ++i) {
await store.index(
net,
chan,
new Msg({
time: dateAddDays(baseDate, -i),
text: `msg ${i}`,
})
);
}
const limit = 1;
const delReq: DeletionRequest = {
messageTypes: [MessageType.MESSAGE],
limit: limit,
olderThanDays: 2,
};
let deleted = await store.deleteMessages(delReq);
expect(deleted).to.equal(limit, "number of deleted messages doesn't match");
let id = 0;
let messages = await store.getMessages(net, chan, () => id++);
expect(messages.find((m) => m.text === "msg 13")).to.be.undefined; // oldest gets deleted first
// let's test if it properly cleans now
delReq.limit = 100;
deleted = await store.deleteMessages(delReq);
expect(deleted).to.equal(11, "number of deleted messages doesn't match");
messages = await store.getMessages(net, chan, () => id++);
expect(messages.map((m) => m.text)).to.have.ordered.members(["msg 1", "msg 0"]);
});
it("deletes only the types it should", async function () {
const baseDate = new Date();
const net = {uuid: "testnet"} as any;
const chan = {name: "#channel"} as any;
for (let i = 0; i < 6; ++i) {
await store.index(
net,
chan,
new Msg({
time: dateAddDays(baseDate, -i),
text: `msg ${i}`,
type: [
MessageType.ACTION,
MessageType.AWAY,
MessageType.JOIN,
MessageType.PART,
MessageType.KICK,
MessageType.MESSAGE,
][i],
})
);
}
const delReq: DeletionRequest = {
messageTypes: [MessageType.ACTION, MessageType.JOIN, MessageType.KICK],
limit: 100, // effectively no limit
olderThanDays: 0,
};
let deleted = await store.deleteMessages(delReq);
expect(deleted).to.equal(3, "number of deleted messages doesn't match");
let id = 0;
let messages = await store.getMessages(net, chan, () => id++);
expect(messages.map((m) => m.type)).to.have.ordered.members([
MessageType.MESSAGE,
MessageType.PART,
MessageType.AWAY,
]);
delReq.messageTypes = [
MessageType.JOIN, // this is not in the remaining set, just here as a dummy
MessageType.PART,
MessageType.MESSAGE,
];
deleted = await store.deleteMessages(delReq);
expect(deleted).to.equal(2, "number of deleted messages doesn't match");
messages = await store.getMessages(net, chan, () => id++);
expect(messages.map((m) => m.type)).to.have.ordered.members([MessageType.AWAY]);
});
});
describe("SQLite Message Storage", function () {
// Increase timeout due to unpredictable I/O on CI services
this.timeout(util.isRunningOnCI() ? 25000 : 5000);
@ -373,3 +480,9 @@ describe("SQLite Message Storage", function () {
expect(fs.existsSync(expectedPath)).to.be.true;
});
});
function dateAddDays(date: Date, days: number) {
const ret = new Date(date.valueOf());
ret.setDate(date.getDate() + days);
return ret;
}