diff --git a/server/plugins/messageStorage/sqlite.ts b/server/plugins/messageStorage/sqlite.ts index 349d2ec2..e0a32859 100644 --- a/server/plugins/messageStorage/sqlite.ts +++ b/server/plugins/messageStorage/sqlite.ts @@ -24,31 +24,73 @@ try { } type Migration = {version: number; stmts: string[]}; +type Rollback = {version: number; rollback_forbidden?: boolean; stmts: string[]}; -export const currentSchemaVersion = 1672236339873; // use `new Date().getTime()` +export const currentSchemaVersion = 1679743888000; // use `new Date().getTime()` // Desired schema, adapt to the newest version and add migrations to the array below const schema = [ - "CREATE TABLE IF NOT EXISTS options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", - "CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", - "CREATE INDEX IF NOT EXISTS network_channel ON messages (network, channel)", - "CREATE INDEX IF NOT EXISTS time ON messages (time)", + "CREATE TABLE options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", + "CREATE TABLE messages (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", + `CREATE TABLE migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + version INTEGER NOT NULL UNIQUE, + rollback_forbidden INTEGER DEFAULT 0 NOT NULL + )`, + `CREATE TABLE rollback_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + migration_id INTEGER NOT NULL REFERENCES migrations ON DELETE CASCADE, + step INTEGER NOT NULL, + statement TEXT NOT NULL + )`, + "CREATE INDEX network_channel ON messages (network, channel)", + "CREATE INDEX time ON messages (time)", ]; // the migrations will be executed in an exclusive transaction as a whole // add new migrations to the end, with the version being the new 'currentSchemaVersion' +// write a corresponding down migration into rollbacks export const migrations: Migration[] = [ { version: 1672236339873, stmts: [ - "CREATE TABLE messages_new (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT);", - "INSERT INTO messages_new(network, channel, time, type, msg) select network, channel, time, type, msg from messages order by time asc;", - "DROP TABLE messages;", - "ALTER TABLE messages_new RENAME TO messages;", - "CREATE INDEX network_channel ON messages (network, channel);", - "CREATE INDEX time ON messages (time);", + "CREATE TABLE messages_new (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", + "INSERT INTO messages_new(network, channel, time, type, msg) select network, channel, time, type, msg from messages order by time asc", + "DROP TABLE messages", + "ALTER TABLE messages_new RENAME TO messages", + "CREATE INDEX network_channel ON messages (network, channel)", + "CREATE INDEX time ON messages (time)", ], }, + { + version: 1679743888000, + stmts: [ + `CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + version INTEGER NOT NULL UNIQUE, + rollback_forbidden INTEGER DEFAULT 0 NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS rollback_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + migration_id INTEGER NOT NULL REFERENCES migrations ON DELETE CASCADE, + step INTEGER NOT NULL, + statement TEXT NOT NULL + )`, + ], + }, +]; + +// down migrations need to restore the state of the prior version. +// rollback can be disallowed by adding rollback_forbidden: true to it +export const rollbacks: Rollback[] = [ + { + version: 1672236339873, + stmts: [], // changes aren't visible, left empty on purpose + }, + { + version: 1679743888000, + stmts: [], // here we can't drop the tables, as we use them in the code, so just leave those in + }, ]; class Deferred { @@ -89,6 +131,7 @@ class SqliteMessageStorage implements SearchableMessageStorage { this.database = new sqlite3.Database(sqlitePath); try { + await this.run_pragmas(); // must be done outside of a transaction await this.run_migrations(); } catch (e) { this.isEnabled = false; @@ -137,6 +180,12 @@ class SqliteMessageStorage implements SearchableMessageStorage { return storedSchemaVersion; } + async update_version_in_db() { + return this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [ + currentSchemaVersion.toString(), + ]); + } + async _run_migrations(dbVersion: number) { log.info( `sqlite messages schema version is out of date (${dbVersion} < ${currentSchemaVersion}). Running migrations.` @@ -148,9 +197,11 @@ class SqliteMessageStorage implements SearchableMessageStorage { await this.serialize_run(stmt, []); } - await this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [ - currentSchemaVersion.toString(), - ]); + await this.update_version_in_db(); + } + + async run_pragmas() { + await this.serialize_run("PRAGMA foreign_keys = ON;", []); } async run_migrations() { @@ -170,6 +221,8 @@ class SqliteMessageStorage implements SearchableMessageStorage { } else { await this._run_migrations(version); } + + await this.insert_rollback_since(version); } catch (err) { await this.serialize_run("ROLLBACK", []); throw err; @@ -198,6 +251,116 @@ class SqliteMessageStorage implements SearchableMessageStorage { }); } + async fetch_rollbacks(since_version: number) { + const res = await this.serialize_fetchall( + `select version, rollback_forbidden, statement + from rollback_steps + join migrations on migrations.id=rollback_steps.migration_id + where version > ? + order by version desc, step asc`, + since_version + ); + const result: Rollback[] = []; + + // convert to Rollback[] + // requires ordering in the sql statement + for (const raw of res) { + const last = result.at(-1); + + if (!last || raw.version !== last.version) { + result.push({ + version: raw.version, + rollback_forbidden: Boolean(raw.rollback_forbidden), + stmts: [raw.statement], + }); + } else { + last.stmts.push(raw.statment); + } + } + + return result; + } + + async delete_migrations_older_than(version: number) { + return this.serialize_run("delete from migrations where migrations.version > ?", [version]); + } + + async _downgrade_to(version: number) { + const _rollbacks = await this.fetch_rollbacks(version); + + if (_rollbacks.length === 0) { + return version; + } + + const forbidden = _rollbacks.find((item) => item.rollback_forbidden); + + if (forbidden) { + throw Error(`can't downgrade past ${forbidden.version}`); + } + + for (const rollback of _rollbacks) { + for (const stmt of rollback.stmts) { + await this.serialize_run(stmt, []); + } + } + + await this.delete_migrations_older_than(version); + await this.update_version_in_db(); + + return _rollbacks.at(-1)!.version; // assert valid due to length guard above + } + + async downgrade_to(version: number) { + if (version <= 0) { + throw Error(`${version} is not a valid version to downgrade to`); + } + + await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []); + + let new_version: number; + + try { + new_version = await this._downgrade_to(version); + } catch (err) { + await this.serialize_run("ROLLBACK", []); + throw err; + } + + await this.serialize_run("COMMIT", []); + return new_version; + } + + async downgrade() { + const res = await this.downgrade_to(currentSchemaVersion); + return res; + } + + async insert_rollback_since(version: number) { + const missing = newRollbacks(version); + + for (const rollback of missing) { + const migration = await this.serialize_get( + `insert into migrations + (version, rollback_forbidden) + values (?, ?) + returning id`, + rollback.version, + rollback.rollback_forbidden || 0 + ); + + for (const stmt of rollback.stmts) { + let step = 0; + await this.serialize_run( + `insert into rollback_steps + (migration_id, step, statement) + values (?, ?, ?)`, + [migration.id, step, stmt] + ); + step++; + } + } + } + async index(network: Network, channel: Chan, msg: Msg) { await this.initDone.promise; @@ -386,4 +549,8 @@ export function necessaryMigrations(since: number): Migration[] { return migrations.filter((m) => m.version > since); } +export function newRollbacks(since: number): Rollback[] { + return rollbacks.filter((r) => r.version > since); +} + export default SqliteMessageStorage; diff --git a/test/plugins/sqlite.ts b/test/plugins/sqlite.ts index e2723872..1ce745cd 100644 --- a/test/plugins/sqlite.ts +++ b/test/plugins/sqlite.ts @@ -9,8 +9,8 @@ import MessageStorage, { currentSchemaVersion, migrations, necessaryMigrations, + rollbacks, } from "../../server/plugins/messageStorage/sqlite"; -import Client from "../../server/client"; import sqlite3 from "sqlite3"; const orig_schema = [ @@ -89,7 +89,14 @@ describe("SQLite migrations", function () { db.close(done); }); - it("has working migrations", async function () { + it("has a down migration for every migration", function () { + expect(migrations.length).to.eq(rollbacks.length); + expect(migrations.map((m) => m.version)).to.have.ordered.members( + rollbacks.map((r) => r.version) + ); + }); + + it("has working up-migrations", async function () { const to_execute = necessaryMigrations(v1_schema_version); expect(to_execute.length).to.eq(migrations.length); await serialize_run("BEGIN EXCLUSIVE TRANSACTION"); @@ -100,6 +107,24 @@ describe("SQLite migrations", function () { await serialize_run("COMMIT TRANSACTION"); }); + + it("has working down-migrations", async function () { + await serialize_run("BEGIN EXCLUSIVE TRANSACTION"); + + for (const rollback of rollbacks.reverse()) { + if (rollback.rollback_forbidden) { + throw Error( + "Try to write a down migration, if you really can't, flip this to a break" + ); + } + + for (const stmt of rollback.stmts) { + await serialize_run(stmt); + } + } + + await serialize_run("COMMIT TRANSACTION"); + }); }); describe("SQLite Message Storage", function () { @@ -110,6 +135,36 @@ describe("SQLite Message Storage", function () { const expectedPath = path.join(Config.getHomePath(), "logs", "testUser.sqlite3"); let store: MessageStorage; + function db_get_one(stmt: string, ...params: any[]): Promise { + return new Promise((resolve, reject) => { + store.database.serialize(() => { + store.database.get(stmt, params, (err, row) => { + if (err) { + reject(err); + return; + } + + resolve(row); + }); + }); + }); + } + + function db_get_mult(stmt: string, ...params: any[]): Promise { + return new Promise((resolve, reject) => { + store.database.serialize(() => { + store.database.all(stmt, params, (err, rows) => { + if (err) { + reject(err); + return; + } + + resolve(rows); + }); + }); + }); + } + before(function (done) { store = new MessageStorage("testUser"); @@ -143,16 +198,17 @@ describe("SQLite Message Storage", function () { store.isEnabled = true; }); - it("should insert schema version to options table", function (done) { - store.database.get( - "SELECT value FROM options WHERE name = 'schema_version'", - (err, row) => { - expect(err).to.be.null; - // compared as string because it's returned as such from the database - expect(row.value).to.equal(currentSchemaVersion.toString()); - done(); - } + it("should insert schema version to options table", async function () { + const row = await db_get_one("SELECT value FROM options WHERE name = 'schema_version'"); + expect(row.value).to.equal(currentSchemaVersion.toString()); + }); + + it("should insert migrations", async function () { + const row = await db_get_one( + "SELECT id, version FROM migrations WHERE version = ?", + currentSchemaVersion ); + expect(row).to.not.be.undefined; }); it("should store a message", async function () { @@ -299,6 +355,19 @@ describe("SQLite Message Storage", function () { } }); + it("should be able to downgrade", async function () { + for (const rollback of rollbacks.reverse()) { + if (rollback.rollback_forbidden) { + throw Error( + "Try to write a down migration, if you really can't, flip this to a break" + ); + } + + const new_version = await store.downgrade_to(rollback.version); + expect(new_version).to.equal(rollback.version); + } + }); + it("should close database", async function () { await store.close(); expect(fs.existsSync(expectedPath)).to.be.true;