diff --git a/server/plugins/messageStorage/sqlite.ts b/server/plugins/messageStorage/sqlite.ts index 7441687d..e0a32859 100644 --- a/server/plugins/messageStorage/sqlite.ts +++ b/server/plugins/messageStorage/sqlite.ts @@ -23,14 +23,74 @@ try { ); } -const currentSchemaVersion = 1520239200; +type Migration = {version: number; stmts: string[]}; +type Rollback = {version: number; rollback_forbidden?: boolean; stmts: string[]}; +export const currentSchemaVersion = 1679743888000; // use `new Date().getTime()` + +// Desired schema, adapt to the newest version and add migrations to the array below const schema = [ - // Schema version #1 - "CREATE TABLE IF NOT EXISTS options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", - "CREATE TABLE IF NOT EXISTS messages (network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", - "CREATE INDEX IF NOT EXISTS network_channel ON messages (network, channel)", - "CREATE INDEX IF NOT EXISTS time ON messages (time)", + "CREATE TABLE options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", + "CREATE TABLE messages (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", + `CREATE TABLE migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + version INTEGER NOT NULL UNIQUE, + rollback_forbidden INTEGER DEFAULT 0 NOT NULL + )`, + `CREATE TABLE rollback_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + migration_id INTEGER NOT NULL REFERENCES migrations ON DELETE CASCADE, + step INTEGER NOT NULL, + statement TEXT NOT NULL + )`, + "CREATE INDEX network_channel ON messages (network, channel)", + "CREATE INDEX time ON messages (time)", +]; + +// the migrations will be executed in an exclusive transaction as a whole +// add new migrations to the end, with the version being the new 'currentSchemaVersion' +// write a corresponding down migration into rollbacks +export const migrations: Migration[] = [ + { + version: 1672236339873, + stmts: [ + "CREATE TABLE messages_new (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", + "INSERT INTO messages_new(network, channel, time, type, msg) select network, channel, time, type, msg from messages order by time asc", + "DROP TABLE messages", + "ALTER TABLE messages_new RENAME TO messages", + "CREATE INDEX network_channel ON messages (network, channel)", + "CREATE INDEX time ON messages (time)", + ], + }, + { + version: 1679743888000, + stmts: [ + `CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + version INTEGER NOT NULL UNIQUE, + rollback_forbidden INTEGER DEFAULT 0 NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS rollback_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + migration_id INTEGER NOT NULL REFERENCES migrations ON DELETE CASCADE, + step INTEGER NOT NULL, + statement TEXT NOT NULL + )`, + ], + }, +]; + +// down migrations need to restore the state of the prior version. +// rollback can be disallowed by adding rollback_forbidden: true to it +export const rollbacks: Rollback[] = [ + { + version: 1672236339873, + stmts: [], // changes aren't visible, left empty on purpose + }, + { + version: 1679743888000, + stmts: [], // here we can't drop the tables, as we use them in the code, so just leave those in + }, ]; class Deferred { @@ -71,6 +131,7 @@ class SqliteMessageStorage implements SearchableMessageStorage { this.database = new sqlite3.Database(sqlitePath); try { + await this.run_pragmas(); // must be done outside of a transaction await this.run_migrations(); } catch (e) { this.isEnabled = false; @@ -86,41 +147,89 @@ class SqliteMessageStorage implements SearchableMessageStorage { } } - async run_migrations() { + async setup_new_db() { for (const stmt of schema) { await this.serialize_run(stmt, []); } + await this.serialize_run("INSERT INTO options (name, value) VALUES ('schema_version', ?)", [ + currentSchemaVersion.toString(), + ]); + } + + async current_version(): Promise { + const have_options = await this.serialize_get( + "select 1 from sqlite_master where type = 'table' and name = 'options'" + ); + + if (!have_options) { + return 0; + } + const version = await this.serialize_get( "SELECT value FROM options WHERE name = 'schema_version'" ); if (version === undefined) { - // new table - await this.serialize_run( - "INSERT INTO options (name, value) VALUES ('schema_version', ?)", - [currentSchemaVersion] - ); - return; + // technically shouldn't happen, means something created a schema but didn't populate it + // we'll try our best to recover + return 0; } const storedSchemaVersion = parseInt(version.value, 10); + return storedSchemaVersion; + } - if (storedSchemaVersion === currentSchemaVersion) { - return; - } - - if (storedSchemaVersion > currentSchemaVersion) { - throw `sqlite messages schema version is higher than expected (${storedSchemaVersion} > ${currentSchemaVersion}). Is The Lounge out of date?`; - } + async update_version_in_db() { + return this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [ + currentSchemaVersion.toString(), + ]); + } + async _run_migrations(dbVersion: number) { log.info( - `sqlite messages schema version is out of date (${storedSchemaVersion} < ${currentSchemaVersion}). Running migrations if any.` + `sqlite messages schema version is out of date (${dbVersion} < ${currentSchemaVersion}). Running migrations.` ); - await this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [ - currentSchemaVersion, - ]); + const to_execute = necessaryMigrations(dbVersion); + + for (const stmt of to_execute.map((m) => m.stmts).flat()) { + await this.serialize_run(stmt, []); + } + + await this.update_version_in_db(); + } + + async run_pragmas() { + await this.serialize_run("PRAGMA foreign_keys = ON;", []); + } + + async run_migrations() { + const version = await this.current_version(); + + if (version > currentSchemaVersion) { + throw `sqlite messages schema version is higher than expected (${version} > ${currentSchemaVersion}). Is The Lounge out of date?`; + } else if (version === currentSchemaVersion) { + return; // nothing to do + } + + await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []); + + try { + if (version === 0) { + await this.setup_new_db(); + } else { + await this._run_migrations(version); + } + + await this.insert_rollback_since(version); + } catch (err) { + await this.serialize_run("ROLLBACK", []); + throw err; + } + + await this.serialize_run("COMMIT", []); + await this.serialize_run("VACUUM", []); } async close() { @@ -142,6 +251,116 @@ class SqliteMessageStorage implements SearchableMessageStorage { }); } + async fetch_rollbacks(since_version: number) { + const res = await this.serialize_fetchall( + `select version, rollback_forbidden, statement + from rollback_steps + join migrations on migrations.id=rollback_steps.migration_id + where version > ? + order by version desc, step asc`, + since_version + ); + const result: Rollback[] = []; + + // convert to Rollback[] + // requires ordering in the sql statement + for (const raw of res) { + const last = result.at(-1); + + if (!last || raw.version !== last.version) { + result.push({ + version: raw.version, + rollback_forbidden: Boolean(raw.rollback_forbidden), + stmts: [raw.statement], + }); + } else { + last.stmts.push(raw.statment); + } + } + + return result; + } + + async delete_migrations_older_than(version: number) { + return this.serialize_run("delete from migrations where migrations.version > ?", [version]); + } + + async _downgrade_to(version: number) { + const _rollbacks = await this.fetch_rollbacks(version); + + if (_rollbacks.length === 0) { + return version; + } + + const forbidden = _rollbacks.find((item) => item.rollback_forbidden); + + if (forbidden) { + throw Error(`can't downgrade past ${forbidden.version}`); + } + + for (const rollback of _rollbacks) { + for (const stmt of rollback.stmts) { + await this.serialize_run(stmt, []); + } + } + + await this.delete_migrations_older_than(version); + await this.update_version_in_db(); + + return _rollbacks.at(-1)!.version; // assert valid due to length guard above + } + + async downgrade_to(version: number) { + if (version <= 0) { + throw Error(`${version} is not a valid version to downgrade to`); + } + + await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []); + + let new_version: number; + + try { + new_version = await this._downgrade_to(version); + } catch (err) { + await this.serialize_run("ROLLBACK", []); + throw err; + } + + await this.serialize_run("COMMIT", []); + return new_version; + } + + async downgrade() { + const res = await this.downgrade_to(currentSchemaVersion); + return res; + } + + async insert_rollback_since(version: number) { + const missing = newRollbacks(version); + + for (const rollback of missing) { + const migration = await this.serialize_get( + `insert into migrations + (version, rollback_forbidden) + values (?, ?) + returning id`, + rollback.version, + rollback.rollback_forbidden || 0 + ); + + for (const stmt of rollback.stmts) { + let step = 0; + await this.serialize_run( + `insert into rollback_steps + (migration_id, step, statement) + values (?, ?, ?)`, + [migration.id, step, stmt] + ); + step++; + } + } + } + async index(network: Network, channel: Chan, msg: Msg) { await this.initDone.promise; @@ -326,4 +545,12 @@ function parseSearchRowsToMessages(id: number, rows: any[]) { return messages; } +export function necessaryMigrations(since: number): Migration[] { + return migrations.filter((m) => m.version > since); +} + +export function newRollbacks(since: number): Rollback[] { + return rollbacks.filter((r) => r.version > since); +} + export default SqliteMessageStorage; diff --git a/test/plugins/sqlite.ts b/test/plugins/sqlite.ts index f83ca58d..1ce745cd 100644 --- a/test/plugins/sqlite.ts +++ b/test/plugins/sqlite.ts @@ -5,7 +5,127 @@ import {expect} from "chai"; import util from "../util"; import Msg, {MessageType} from "../../server/models/msg"; import Config from "../../server/config"; -import MessageStorage from "../../server/plugins/messageStorage/sqlite"; +import MessageStorage, { + currentSchemaVersion, + migrations, + necessaryMigrations, + rollbacks, +} from "../../server/plugins/messageStorage/sqlite"; +import sqlite3 from "sqlite3"; + +const orig_schema = [ + // Schema version #1 + // DO NOT CHANGE THIS IN ANY WAY, it's needed to properly test migrations + "CREATE TABLE IF NOT EXISTS options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", + "CREATE TABLE IF NOT EXISTS messages (network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", + "CREATE INDEX IF NOT EXISTS network_channel ON messages (network, channel)", + "CREATE INDEX IF NOT EXISTS time ON messages (time)", +]; + +const v1_schema_version = 1520239200; + +const v1_dummy_messages = [ + { + network: "8f650427-79a2-4950-b8af-94088b61b37c", + channel: "##linux", + time: 1594845354280, + type: "message", + msg: '{"from":{"mode":"","nick":"rascul"},"text":"db on a flash drive doesn\'t sound very nice though","self":false,"highlight":false,"users":[]}', + }, + { + network: "8f650427-79a2-4950-b8af-94088b61b37c", + channel: "##linux", + time: 1594845357234, + type: "message", + msg: '{"from":{"mode":"","nick":"GrandPa-G"},"text":"that\'s the point of changing to make sure.","self":false,"highlight":false,"users":[]}', + }, + { + network: "8f650427-79a2-4950-b8af-94088b61b37c", + channel: "#pleroma-dev", + time: 1594845358464, + type: "message", + msg: '{"from":{"mode":"@","nick":"rinpatch"},"text":"it\'s complicated","self":false,"highlight":false,"users":[]}', + }, +]; + +describe("SQLite migrations", function () { + let db: sqlite3.Database; + + function serialize_run(stmt: string, ...params: any[]): Promise { + return new Promise((resolve, reject) => { + db.serialize(() => { + db.run(stmt, params, (err) => { + if (err) { + reject(err); + return; + } + + resolve(); + }); + }); + }); + } + + before(async function () { + db = new sqlite3.Database(":memory:"); + + for (const stmt of orig_schema) { + await serialize_run(stmt); + } + + for (const msg of v1_dummy_messages) { + await serialize_run( + "INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)", + msg.network, + msg.channel, + msg.time, + msg.type, + msg.msg + ); + } + }); + + after(function (done) { + db.close(done); + }); + + it("has a down migration for every migration", function () { + expect(migrations.length).to.eq(rollbacks.length); + expect(migrations.map((m) => m.version)).to.have.ordered.members( + rollbacks.map((r) => r.version) + ); + }); + + it("has working up-migrations", async function () { + const to_execute = necessaryMigrations(v1_schema_version); + expect(to_execute.length).to.eq(migrations.length); + await serialize_run("BEGIN EXCLUSIVE TRANSACTION"); + + for (const stmt of to_execute.map((m) => m.stmts).flat()) { + await serialize_run(stmt); + } + + await serialize_run("COMMIT TRANSACTION"); + }); + + it("has working down-migrations", async function () { + await serialize_run("BEGIN EXCLUSIVE TRANSACTION"); + + for (const rollback of rollbacks.reverse()) { + if (rollback.rollback_forbidden) { + throw Error( + "Try to write a down migration, if you really can't, flip this to a break" + ); + } + + for (const stmt of rollback.stmts) { + await serialize_run(stmt); + } + } + + await serialize_run("COMMIT TRANSACTION"); + }); +}); describe("SQLite Message Storage", function () { // Increase timeout due to unpredictable I/O on CI services @@ -15,6 +135,36 @@ describe("SQLite Message Storage", function () { const expectedPath = path.join(Config.getHomePath(), "logs", "testUser.sqlite3"); let store: MessageStorage; + function db_get_one(stmt: string, ...params: any[]): Promise { + return new Promise((resolve, reject) => { + store.database.serialize(() => { + store.database.get(stmt, params, (err, row) => { + if (err) { + reject(err); + return; + } + + resolve(row); + }); + }); + }); + } + + function db_get_mult(stmt: string, ...params: any[]): Promise { + return new Promise((resolve, reject) => { + store.database.serialize(() => { + store.database.all(stmt, params, (err, rows) => { + if (err) { + reject(err); + return; + } + + resolve(rows); + }); + }); + }); + } + before(function (done) { store = new MessageStorage("testUser"); @@ -48,42 +198,17 @@ describe("SQLite Message Storage", function () { store.isEnabled = true; }); - it("should create tables", function (done) { - store.database.all( - "SELECT name, tbl_name, sql FROM sqlite_master WHERE type = 'table'", - (err, row) => { - expect(err).to.be.null; - expect(row).to.deep.equal([ - { - name: "options", - tbl_name: "options", - sql: "CREATE TABLE options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))", - }, - { - name: "messages", - tbl_name: "messages", - sql: "CREATE TABLE messages (network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)", - }, - ]); - - done(); - } - ); + it("should insert schema version to options table", async function () { + const row = await db_get_one("SELECT value FROM options WHERE name = 'schema_version'"); + expect(row.value).to.equal(currentSchemaVersion.toString()); }); - it("should insert schema version to options table", function (done) { - store.database.get( - "SELECT value FROM options WHERE name = 'schema_version'", - (err, row: {value: string}) => { - expect(err).to.be.null; - - // Should be sqlite.currentSchemaVersion, - // compared as string because it's returned as such from the database - expect(row.value).to.equal("1520239200"); - - done(); - } + it("should insert migrations", async function () { + const row = await db_get_one( + "SELECT id, version FROM migrations WHERE version = ?", + currentSchemaVersion ); + expect(row).to.not.be.undefined; }); it("should store a message", async function () { @@ -230,6 +355,19 @@ describe("SQLite Message Storage", function () { } }); + it("should be able to downgrade", async function () { + for (const rollback of rollbacks.reverse()) { + if (rollback.rollback_forbidden) { + throw Error( + "Try to write a down migration, if you really can't, flip this to a break" + ); + } + + const new_version = await store.downgrade_to(rollback.version); + expect(new_version).to.equal(rollback.version); + } + }); + it("should close database", async function () { await store.close(); expect(fs.existsSync(expectedPath)).to.be.true;