diff --git a/client/tsconfig.json b/client/tsconfig.json index 4054c4f1..2c7cbdcd 100644 --- a/client/tsconfig.json +++ b/client/tsconfig.json @@ -11,6 +11,7 @@ "../server/log.ts", "../server/config.ts", "../server/client.ts", + "../server/storageCleaner.ts", "../server/clientManager.ts", "../server/identification.ts", "../server/plugins/changelog.ts", diff --git a/defaults/config.js b/defaults/config.js index 51bc090e..a943ca0f 100644 --- a/defaults/config.js +++ b/defaults/config.js @@ -304,6 +304,26 @@ module.exports = { // This value is set to `["sqlite", "text"]` by default. messageStorage: ["sqlite", "text"], + // ### `storagePolicy` + + // When the sqlite storage is in use, control the maximum storage duration. + // A background task will periodically clean up messages older than the limit. + + // The available keys for the `storagePolicy` object are: + // + // - `enabled`: If this is false, the cleaning task is not running. + // - `maxAgeDays`: Maximum age of an entry in days. + // - `deletionPolicy`: Controls what types of messages are being deleted. + // Valid options are: + // - `statusOnly`: Only delete message types which are status related (e.g. away, back, join, parts, mode, ctcp...) + // but keep actual messages from nicks. This keeps the DB size down while retaining "precious" messages. + // - `everything`: Delete everything, including messages from irc nicks + storagePolicy: { + enabled: false, + maxAgeDays: 7, + deletionPolicy: "statusOnly", + }, + // ### `useHexIp` // // When set to `true`, users' IP addresses will be encoded as hex. diff --git a/server/client.ts b/server/client.ts index 648e4aa9..9f7c08d9 100644 --- a/server/client.ts +++ b/server/client.ts @@ -18,6 +18,7 @@ import TextFileMessageStorage from "./plugins/messageStorage/text"; import Network, {IgnoreListItem, NetworkConfig, NetworkWithIrcFramework} from "./models/network"; import ClientManager from "./clientManager"; import {MessageStorage, SearchQuery, SearchResponse} from "./plugins/messageStorage/types"; +import {StorageCleaner} from "./storageCleaner"; type OrderItem = Chan["id"] | Network["uuid"]; type Order = OrderItem[]; @@ -138,6 +139,15 @@ class Client { if (!Config.values.public && client.config.log) { if (Config.values.messageStorage.includes("sqlite")) { client.messageProvider = new SqliteMessageStorage(client.name); + + if (Config.values.storagePolicy.enabled) { + log.info( + `Activating storage cleaner. Policy: ${Config.values.storagePolicy.deletionPolicy}. MaxAge: ${Config.values.storagePolicy.maxAgeDays} days` + ); + const cleaner = new StorageCleaner(client.messageProvider); + cleaner.start(); + } + client.messageStorage.push(client.messageProvider); } diff --git a/server/command-line/storage.ts b/server/command-line/storage.ts index df9acea0..3f9184b2 100644 --- a/server/command-line/storage.ts +++ b/server/command-line/storage.ts @@ -3,6 +3,7 @@ import {Command} from "commander"; import ClientManager from "../clientManager"; import Utils from "./utils"; import SqliteMessageStorage from "../plugins/messageStorage/sqlite"; +import {StorageCleaner} from "../storageCleaner"; const program = new Command("storage").description( "various utilities related to the message storage" @@ -10,7 +11,7 @@ const program = new Command("storage").description( program .command("migrate") - .argument("[user]", "migrate a specific user only, all if not provided") + .argument("[username]", "migrate a specific user only, all if not provided") .description("Migrate message storage where needed") .on("--help", Utils.extraHelp) .action(function (user) { @@ -20,7 +21,19 @@ program }); }); -async function runMigrations(user: string) { +program + .command("clean") + .argument("[user]", "clean messages for a specific user only, all if not provided") + .description("Delete messages from the DB based on the storage policy") + .on("--help", Utils.extraHelp) + .action(function (user) { + runCleaning(user).catch((err) => { + log.error(err.toString()); + process.exit(1); + }); + }); + +async function runMigrations(user?: string) { const manager = new ClientManager(); const users = manager.getUsers(); @@ -65,4 +78,46 @@ function isUserLogEnabled(manager: ClientManager, user: string): boolean { return conf.log; } +async function runCleaning(user: string) { + const manager = new ClientManager(); + const users = manager.getUsers(); + + if (user) { + if (!users.includes(user)) { + throw new Error(`invalid user ${user}`); + } + + return cleanUser(manager, user); + } + + for (const name of users) { + await cleanUser(manager, name); + // if any migration fails we blow up, + // chances are the rest won't complete either + } +} + +async function cleanUser(manager: ClientManager, user: string) { + log.info("handling user", user); + + if (!isUserLogEnabled(manager, user)) { + log.info("logging disabled for user", user, ". Skipping"); + return; + } + + const sqlite = new SqliteMessageStorage(user); + await sqlite.enable(); + const cleaner = new StorageCleaner(sqlite); + const num_deleted = await cleaner.runDeletesNoLimit(); + log.info(`deleted ${num_deleted} messages`); + log.info("running a vacuum now, this might take a while"); + + if (num_deleted > 0) { + await sqlite.vacuum(); + } + + await sqlite.close(); + log.info(`cleaning messages for ${user} has been successful`); +} + export default program; diff --git a/server/config.ts b/server/config.ts index 543a8135..bad5f522 100644 --- a/server/config.ts +++ b/server/config.ts @@ -76,6 +76,12 @@ type Debug = { raw: boolean; }; +type StoragePolicy = { + enabled: boolean; + maxAgeDays: number; + deletionPolicy: "statusOnly" | "everything"; +}; + export type ConfigType = { public: boolean; host: string | undefined; @@ -97,6 +103,7 @@ export type ConfigType = { defaults: Defaults; lockNetwork: boolean; messageStorage: string[]; + storagePolicy: StoragePolicy; useHexIp: boolean; webirc?: WebIRC; identd: Identd; diff --git a/server/plugins/messageStorage/sqlite.ts b/server/plugins/messageStorage/sqlite.ts index 7c086073..713f108f 100644 --- a/server/plugins/messageStorage/sqlite.ts +++ b/server/plugins/messageStorage/sqlite.ts @@ -7,7 +7,7 @@ import Config from "../../config"; import Msg, {Message} from "../../models/msg"; import Chan, {Channel} from "../../models/chan"; import Helper from "../../helper"; -import type {SearchResponse, SearchQuery, SearchableMessageStorage} from "./types"; +import type {SearchResponse, SearchQuery, SearchableMessageStorage, DeletionRequest} from "./types"; import Network from "../../models/network"; // TODO; type @@ -26,7 +26,7 @@ try { type Migration = {version: number; stmts: string[]}; type Rollback = {version: number; rollback_forbidden?: boolean; stmts: string[]}; -export const currentSchemaVersion = 1679743888000; // use `new Date().getTime()` +export const currentSchemaVersion = 1703322560448; // use `new Date().getTime()` // Desired schema, adapt to the newest version and add migrations to the array below const schema = [ @@ -45,6 +45,7 @@ const schema = [ )`, "CREATE INDEX network_channel ON messages (network, channel)", "CREATE INDEX time ON messages (time)", + "CREATE INDEX msg_type_idx on messages (type)", // needed for efficient storageCleaner queries ]; // the migrations will be executed in an exclusive transaction as a whole @@ -78,6 +79,10 @@ export const migrations: Migration[] = [ )`, ], }, + { + version: 1703322560448, + stmts: ["CREATE INDEX msg_type_idx on messages (type)"], + }, ]; // down migrations need to restore the state of the prior version. @@ -91,6 +96,10 @@ export const rollbacks: Rollback[] = [ version: 1679743888000, stmts: [], // here we can't drop the tables, as we use them in the code, so just leave those in }, + { + version: 1703322560448, + stmts: ["drop INDEX msg_type_idx"], + }, ]; class Deferred { @@ -116,7 +125,21 @@ class SqliteMessageStorage implements SearchableMessageStorage { this.initDone = new Deferred(); } - async _enable() { + async _enable(connection_string: string) { + this.database = new sqlite3.Database(connection_string); + + try { + await this.run_pragmas(); // must be done outside of a transaction + await this.run_migrations(); + } catch (e) { + this.isEnabled = false; + throw Helper.catch_to_error("Migration failed", e); + } + + this.isEnabled = true; + } + + async enable() { const logsPath = Config.getUserLogsPath(); const sqlitePath = path.join(logsPath, `${this.userName}.sqlite3`); @@ -126,22 +149,8 @@ class SqliteMessageStorage implements SearchableMessageStorage { throw Helper.catch_to_error("Unable to create logs directory", e); } - this.isEnabled = true; - - this.database = new sqlite3.Database(sqlitePath); - try { - await this.run_pragmas(); // must be done outside of a transaction - await this.run_migrations(); - } catch (e) { - this.isEnabled = false; - throw Helper.catch_to_error("Migration failed", e); - } - } - - async enable() { - try { - await this._enable(); + await this._enable(sqlitePath); } finally { this.initDone.resolve(); // unblock the instance methods } @@ -149,12 +158,13 @@ class SqliteMessageStorage implements SearchableMessageStorage { async setup_new_db() { for (const stmt of schema) { - await this.serialize_run(stmt, []); + await this.serialize_run(stmt); } - await this.serialize_run("INSERT INTO options (name, value) VALUES ('schema_version', ?)", [ - currentSchemaVersion.toString(), - ]); + await this.serialize_run( + "INSERT INTO options (name, value) VALUES ('schema_version', ?)", + currentSchemaVersion.toString() + ); } async current_version(): Promise { @@ -181,9 +191,10 @@ class SqliteMessageStorage implements SearchableMessageStorage { } async update_version_in_db() { - return this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [ - currentSchemaVersion.toString(), - ]); + return this.serialize_run( + "UPDATE options SET value = ? WHERE name = 'schema_version'", + currentSchemaVersion.toString() + ); } async _run_migrations(dbVersion: number) { @@ -194,14 +205,14 @@ class SqliteMessageStorage implements SearchableMessageStorage { const to_execute = necessaryMigrations(dbVersion); for (const stmt of to_execute.map((m) => m.stmts).flat()) { - await this.serialize_run(stmt, []); + await this.serialize_run(stmt); } await this.update_version_in_db(); } async run_pragmas() { - await this.serialize_run("PRAGMA foreign_keys = ON;", []); + await this.serialize_run("PRAGMA foreign_keys = ON;"); } async run_migrations() { @@ -213,7 +224,7 @@ class SqliteMessageStorage implements SearchableMessageStorage { return; // nothing to do } - await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []); + await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION"); try { if (version === 0) { @@ -224,12 +235,17 @@ class SqliteMessageStorage implements SearchableMessageStorage { await this.insert_rollback_since(version); } catch (err) { - await this.serialize_run("ROLLBACK", []); + await this.serialize_run("ROLLBACK"); throw err; } - await this.serialize_run("COMMIT", []); - await this.serialize_run("VACUUM", []); + await this.serialize_run("COMMIT"); + await this.serialize_run("VACUUM"); + } + + // helper method that vacuums the db, meant to be used by migration related cli commands + async vacuum() { + await this.serialize_run("VACUUM"); } async close() { @@ -282,7 +298,7 @@ class SqliteMessageStorage implements SearchableMessageStorage { } async delete_migrations_older_than(version: number) { - return this.serialize_run("delete from migrations where migrations.version > ?", [version]); + return this.serialize_run("delete from migrations where migrations.version > ?", version); } async _downgrade_to(version: number) { @@ -300,7 +316,7 @@ class SqliteMessageStorage implements SearchableMessageStorage { for (const rollback of _rollbacks) { for (const stmt of rollback.stmts) { - await this.serialize_run(stmt, []); + await this.serialize_run(stmt); } } @@ -315,18 +331,18 @@ class SqliteMessageStorage implements SearchableMessageStorage { throw Error(`${version} is not a valid version to downgrade to`); } - await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []); + await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION"); let new_version: number; try { new_version = await this._downgrade_to(version); } catch (err) { - await this.serialize_run("ROLLBACK", []); + await this.serialize_run("ROLLBACK"); throw err; } - await this.serialize_run("COMMIT", []); + await this.serialize_run("COMMIT"); return new_version; } @@ -354,7 +370,9 @@ class SqliteMessageStorage implements SearchableMessageStorage { `insert into rollback_steps (migration_id, step, statement) values (?, ?, ?)`, - [migration.id, step, stmt] + migration.id, + step, + stmt ); step++; } @@ -381,13 +399,12 @@ class SqliteMessageStorage implements SearchableMessageStorage { await this.serialize_run( "INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)", - [ - network.uuid, - channel.name.toLowerCase(), - msg.time.getTime(), - msg.type, - JSON.stringify(clonedMsg), - ] + + network.uuid, + channel.name.toLowerCase(), + msg.time.getTime(), + msg.type, + JSON.stringify(clonedMsg) ); } @@ -398,10 +415,11 @@ class SqliteMessageStorage implements SearchableMessageStorage { return; } - await this.serialize_run("DELETE FROM messages WHERE network = ? AND channel = ?", [ + await this.serialize_run( + "DELETE FROM messages WHERE network = ? AND channel = ?", network.uuid, - channel.name.toLowerCase(), - ]); + channel.name.toLowerCase() + ); } async getMessages( @@ -477,20 +495,47 @@ class SqliteMessageStorage implements SearchableMessageStorage { }; } + async deleteMessages(req: DeletionRequest): Promise { + await this.initDone.promise; + let sql = "delete from messages where id in (select id from messages where\n"; + + // We roughly get a timestamp from N days before. + // We don't adjust for daylight savings time or other weird time jumps + const millisecondsInDay = 24 * 60 * 60 * 1000; + const deleteBefore = Date.now() - req.olderThanDays * millisecondsInDay; + sql += `time <= ${deleteBefore}\n`; + + let typeClause = ""; + + if (req.messageTypes !== null) { + typeClause = `type in (${req.messageTypes.map((type) => `'${type}'`).join(",")})\n`; + } + + if (typeClause) { + sql += `and ${typeClause}`; + } + + sql += "order by time asc\n"; + sql += `limit ${req.limit}\n`; + sql += ")"; + + return this.serialize_run(sql); + } + canProvideMessages() { return this.isEnabled; } - private serialize_run(stmt: string, params: any[]): Promise { + private serialize_run(stmt: string, ...params: any[]): Promise { return new Promise((resolve, reject) => { this.database.serialize(() => { - this.database.run(stmt, params, (err) => { + this.database.run(stmt, params, function (err) { if (err) { reject(err); return; } - resolve(); + resolve(this.changes); // number of affected rows, `this` is re-bound by sqlite3 }); }); }); diff --git a/server/plugins/messageStorage/types.d.ts b/server/plugins/messageStorage/types.d.ts index cc305224..7e17ba54 100644 --- a/server/plugins/messageStorage/types.d.ts +++ b/server/plugins/messageStorage/types.d.ts @@ -4,6 +4,13 @@ import {Channel} from "../../models/channel"; import {Message} from "../../models/message"; import {Network} from "../../models/network"; import Client from "../../client"; +import type {MessageType} from "../../models/msg"; + +export type DeletionRequest = { + olderThanDays: number; + messageTypes: MessageType[] | null; // null means no restriction + limit: number; // -1 means unlimited +}; interface MessageStorage { isEnabled: boolean; diff --git a/server/storageCleaner.ts b/server/storageCleaner.ts new file mode 100644 index 00000000..cad486cc --- /dev/null +++ b/server/storageCleaner.ts @@ -0,0 +1,148 @@ +import SqliteMessageStorage from "./plugins/messageStorage/sqlite"; +import {MessageType} from "./models/msg"; +import Config from "./config"; +import {DeletionRequest} from "./plugins/messageStorage/types"; +import log from "./log"; + +const status_types = [ + MessageType.AWAY, + MessageType.BACK, + MessageType.INVITE, + MessageType.JOIN, + MessageType.KICK, + MessageType.MODE, + MessageType.MODE_CHANNEL, + MessageType.MODE_USER, + MessageType.NICK, + MessageType.PART, + MessageType.QUIT, + MessageType.CTCP, // not technically a status, but generally those are only of interest temporarily + MessageType.CTCP_REQUEST, + MessageType.CHGHOST, + MessageType.TOPIC, + MessageType.TOPIC_SET_BY, +]; + +export class StorageCleaner { + db: SqliteMessageStorage; + olderThanDays: number; + messageTypes: MessageType[] | null; + limit: number; + ticker?: ReturnType; + errCount: number; + isStopped: boolean; + + constructor(db: SqliteMessageStorage) { + this.errCount = 0; + this.isStopped = true; + this.db = db; + this.limit = 200; + const policy = Config.values.storagePolicy; + this.olderThanDays = policy.maxAgeDays; + + switch (policy.deletionPolicy) { + case "statusOnly": + this.messageTypes = status_types; + break; + case "everything": + this.messageTypes = null; + break; + default: + // exhaustive switch guard, blows up when user specifies a invalid policy enum + this.messageTypes = assertNoBadPolicy(policy.deletionPolicy); + } + } + + private genDeletionRequest(): DeletionRequest { + return { + limit: this.limit, + messageTypes: this.messageTypes, + olderThanDays: this.olderThanDays, + }; + } + + async runDeletesNoLimit(): Promise { + if (!Config.values.storagePolicy.enabled) { + // this is meant to be used by cli tools, so we guard against this + throw new Error("storage policy is disabled"); + } + + const req = this.genDeletionRequest(); + req.limit = -1; // unlimited + const num_deleted = await this.db.deleteMessages(req); + return num_deleted; + } + + private async runDeletes() { + if (this.isStopped) { + return; + } + + if (!this.db.isEnabled) { + // TODO: remove this once the server is intelligent enough to wait for init + this.schedule(30 * 1000); + return; + } + + const req = this.genDeletionRequest(); + + let num_deleted = 0; + + try { + num_deleted = await this.db.deleteMessages(req); + this.errCount = 0; // reset when it works + } catch (err: any) { + this.errCount++; + log.error("can't clean messages", err.message); + + if (this.errCount === 2) { + log.error("Cleaning failed too many times, will not retry"); + this.stop(); + return; + } + } + + // need to recheck here as the field may have changed since the await + if (this.isStopped) { + return; + } + + if (num_deleted < req.limit) { + this.schedule(5 * 60 * 1000); + } else { + this.schedule(5000); // give others a chance to execute queries + } + } + + private schedule(ms: number) { + const self = this; + + this.ticker = setTimeout(() => { + self.runDeletes().catch((err) => { + log.error("storageCleaner: unexpected failure"); + throw err; + }); + }, ms); + } + + start() { + this.isStopped = false; + this.schedule(0); + } + + stop() { + this.isStopped = true; + + if (!this.ticker) { + return; + } + + clearTimeout(this.ticker); + } +} + +function assertNoBadPolicy(_: never): never { + throw new Error( + `Invalid deletion policy "${Config.values.storagePolicy.deletionPolicy}" in the \`storagePolicy\` object, fix your config.` + ); +} diff --git a/test/plugins/sqlite.ts b/test/plugins/sqlite.ts index 400d3c9a..e2af20be 100644 --- a/test/plugins/sqlite.ts +++ b/test/plugins/sqlite.ts @@ -12,6 +12,7 @@ import MessageStorage, { rollbacks, } from "../../server/plugins/messageStorage/sqlite"; import sqlite3 from "sqlite3"; +import {DeletionRequest} from "../../server/plugins/messageStorage/types"; const orig_schema = [ // Schema version #1 @@ -127,6 +128,112 @@ describe("SQLite migrations", function () { }); }); +describe("SQLite unit tests", function () { + let store: MessageStorage; + + beforeEach(async function () { + store = new MessageStorage("testUser"); + await store._enable(":memory:"); + store.initDone.resolve(); + }); + + afterEach(async function () { + await store.close(); + }); + + it("deletes messages when asked to", async function () { + const baseDate = new Date(); + + const net = {uuid: "testnet"} as any; + const chan = {name: "#channel"} as any; + + for (let i = 0; i < 14; ++i) { + await store.index( + net, + chan, + new Msg({ + time: dateAddDays(baseDate, -i), + text: `msg ${i}`, + }) + ); + } + + const limit = 1; + const delReq: DeletionRequest = { + messageTypes: [MessageType.MESSAGE], + limit: limit, + olderThanDays: 2, + }; + + let deleted = await store.deleteMessages(delReq); + expect(deleted).to.equal(limit, "number of deleted messages doesn't match"); + + let id = 0; + let messages = await store.getMessages(net, chan, () => id++); + expect(messages.find((m) => m.text === "msg 13")).to.be.undefined; // oldest gets deleted first + + // let's test if it properly cleans now + delReq.limit = 100; + deleted = await store.deleteMessages(delReq); + expect(deleted).to.equal(11, "number of deleted messages doesn't match"); + messages = await store.getMessages(net, chan, () => id++); + expect(messages.map((m) => m.text)).to.have.ordered.members(["msg 1", "msg 0"]); + }); + + it("deletes only the types it should", async function () { + const baseDate = new Date(); + + const net = {uuid: "testnet"} as any; + const chan = {name: "#channel"} as any; + + for (let i = 0; i < 6; ++i) { + await store.index( + net, + chan, + new Msg({ + time: dateAddDays(baseDate, -i), + text: `msg ${i}`, + type: [ + MessageType.ACTION, + MessageType.AWAY, + MessageType.JOIN, + MessageType.PART, + MessageType.KICK, + MessageType.MESSAGE, + ][i], + }) + ); + } + + const delReq: DeletionRequest = { + messageTypes: [MessageType.ACTION, MessageType.JOIN, MessageType.KICK], + limit: 100, // effectively no limit + olderThanDays: 0, + }; + + let deleted = await store.deleteMessages(delReq); + expect(deleted).to.equal(3, "number of deleted messages doesn't match"); + + let id = 0; + let messages = await store.getMessages(net, chan, () => id++); + expect(messages.map((m) => m.type)).to.have.ordered.members([ + MessageType.MESSAGE, + MessageType.PART, + MessageType.AWAY, + ]); + + delReq.messageTypes = [ + MessageType.JOIN, // this is not in the remaining set, just here as a dummy + MessageType.PART, + MessageType.MESSAGE, + ]; + deleted = await store.deleteMessages(delReq); + expect(deleted).to.equal(2, "number of deleted messages doesn't match"); + messages = await store.getMessages(net, chan, () => id++); + expect(messages.map((m) => m.type)).to.have.ordered.members([MessageType.AWAY]); + }); +}); + describe("SQLite Message Storage", function () { // Increase timeout due to unpredictable I/O on CI services this.timeout(util.isRunningOnCI() ? 25000 : 5000); @@ -373,3 +480,9 @@ describe("SQLite Message Storage", function () { expect(fs.existsSync(expectedPath)).to.be.true; }); }); + +function dateAddDays(date: Date, days: number) { + const ret = new Date(date.valueOf()); + ret.setDate(date.getDate() + days); + return ret; +}