mirror of
https://github.com/thelounge/thelounge.git
synced 2024-05-18 06:06:36 +02:00
3e7255ff20
We want primary keys to never get re-used to so that we can implement jump to messages / context fetching etc in the future. This isn't hooked up yet at all to the rest of the code, only the schema is changed
390 lines
9.7 KiB
TypeScript
390 lines
9.7 KiB
TypeScript
import type {Database} from "sqlite3";
|
|
|
|
import log from "../../log";
|
|
import path from "path";
|
|
import fs from "fs/promises";
|
|
import Config from "../../config";
|
|
import Msg, {Message} from "../../models/msg";
|
|
import Chan, {Channel} from "../../models/chan";
|
|
import Helper from "../../helper";
|
|
import type {SearchResponse, SearchQuery, SearchableMessageStorage} from "./types";
|
|
import Network from "../../models/network";
|
|
|
|
// TODO; type
|
|
let sqlite3: any;
|
|
|
|
try {
|
|
sqlite3 = require("sqlite3");
|
|
} catch (e: any) {
|
|
Config.values.messageStorage = Config.values.messageStorage.filter((item) => item !== "sqlite");
|
|
|
|
log.error(
|
|
"Unable to load sqlite3 module. See https://github.com/mapbox/node-sqlite3/wiki/Binaries"
|
|
);
|
|
}
|
|
|
|
type Migration = {version: number; stmts: string[]};
|
|
|
|
export const currentSchemaVersion = 1672236339873; // use `new Date().getTime()`
|
|
|
|
// Desired schema, adapt to the newest version and add migrations to the array below
|
|
const schema = [
|
|
"CREATE TABLE IF NOT EXISTS options (name TEXT, value TEXT, CONSTRAINT name_unique UNIQUE (name))",
|
|
"CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT)",
|
|
"CREATE INDEX IF NOT EXISTS network_channel ON messages (network, channel)",
|
|
"CREATE INDEX IF NOT EXISTS time ON messages (time)",
|
|
];
|
|
|
|
// the migrations will be executed in an exclusive transaction as a whole
|
|
// add new migrations to the end, with the version being the new 'currentSchemaVersion'
|
|
export const migrations: Migration[] = [
|
|
{
|
|
version: 1672236339873,
|
|
stmts: [
|
|
"CREATE TABLE messages_new (id INTEGER PRIMARY KEY AUTOINCREMENT, network TEXT, channel TEXT, time INTEGER, type TEXT, msg TEXT);",
|
|
"INSERT INTO messages_new(network, channel, time, type, msg) select network, channel, time, type, msg from messages order by time asc;",
|
|
"DROP TABLE messages;",
|
|
"ALTER TABLE messages_new RENAME TO messages;",
|
|
"CREATE INDEX network_channel ON messages (network, channel);",
|
|
"CREATE INDEX time ON messages (time);",
|
|
],
|
|
},
|
|
];
|
|
|
|
class Deferred {
|
|
resolve!: () => void;
|
|
promise: Promise<void>;
|
|
|
|
constructor() {
|
|
this.promise = new Promise((resolve) => {
|
|
this.resolve = resolve;
|
|
});
|
|
}
|
|
}
|
|
|
|
class SqliteMessageStorage implements SearchableMessageStorage {
|
|
isEnabled: boolean;
|
|
database!: Database;
|
|
initDone: Deferred;
|
|
userName: string;
|
|
|
|
constructor(userName: string) {
|
|
this.userName = userName;
|
|
this.isEnabled = false;
|
|
this.initDone = new Deferred();
|
|
}
|
|
|
|
async _enable() {
|
|
const logsPath = Config.getUserLogsPath();
|
|
const sqlitePath = path.join(logsPath, `${this.userName}.sqlite3`);
|
|
|
|
try {
|
|
await fs.mkdir(logsPath, {recursive: true});
|
|
} catch (e) {
|
|
throw Helper.catch_to_error("Unable to create logs directory", e);
|
|
}
|
|
|
|
this.isEnabled = true;
|
|
|
|
this.database = new sqlite3.Database(sqlitePath);
|
|
|
|
try {
|
|
await this.run_migrations();
|
|
} catch (e) {
|
|
this.isEnabled = false;
|
|
throw Helper.catch_to_error("Migration failed", e);
|
|
}
|
|
}
|
|
|
|
async enable() {
|
|
try {
|
|
await this._enable();
|
|
} finally {
|
|
this.initDone.resolve(); // unblock the instance methods
|
|
}
|
|
}
|
|
|
|
async setup_new_db() {
|
|
for (const stmt of schema) {
|
|
await this.serialize_run(stmt, []);
|
|
}
|
|
|
|
await this.serialize_run("INSERT INTO options (name, value) VALUES ('schema_version', ?)", [
|
|
currentSchemaVersion.toString(),
|
|
]);
|
|
}
|
|
|
|
async current_version(): Promise<number> {
|
|
const have_options = await this.serialize_get(
|
|
"select 1 from sqlite_master where type = 'table' and name = 'options'"
|
|
);
|
|
|
|
if (!have_options) {
|
|
return 0;
|
|
}
|
|
|
|
const version = await this.serialize_get(
|
|
"SELECT value FROM options WHERE name = 'schema_version'"
|
|
);
|
|
|
|
if (version === undefined) {
|
|
// technically shouldn't happen, means something created a schema but didn't populate it
|
|
// we'll try our best to recover
|
|
return 0;
|
|
}
|
|
|
|
const storedSchemaVersion = parseInt(version.value, 10);
|
|
return storedSchemaVersion;
|
|
}
|
|
|
|
async _run_migrations(dbVersion: number) {
|
|
log.info(
|
|
`sqlite messages schema version is out of date (${dbVersion} < ${currentSchemaVersion}). Running migrations.`
|
|
);
|
|
|
|
const to_execute = necessaryMigrations(dbVersion);
|
|
|
|
for (const stmt of to_execute.map((m) => m.stmts).flat()) {
|
|
await this.serialize_run(stmt, []);
|
|
}
|
|
|
|
await this.serialize_run("UPDATE options SET value = ? WHERE name = 'schema_version'", [
|
|
currentSchemaVersion.toString(),
|
|
]);
|
|
}
|
|
|
|
async run_migrations() {
|
|
const version = await this.current_version();
|
|
|
|
if (version > currentSchemaVersion) {
|
|
throw `sqlite messages schema version is higher than expected (${version} > ${currentSchemaVersion}). Is The Lounge out of date?`;
|
|
} else if (version === currentSchemaVersion) {
|
|
return; // nothing to do
|
|
}
|
|
|
|
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION", []);
|
|
|
|
try {
|
|
if (version === 0) {
|
|
await this.setup_new_db();
|
|
} else {
|
|
await this._run_migrations(version);
|
|
}
|
|
} catch (err) {
|
|
await this.serialize_run("ROLLBACK", []);
|
|
throw err;
|
|
}
|
|
|
|
await this.serialize_run("COMMIT", []);
|
|
await this.serialize_run("VACUUM", []);
|
|
}
|
|
|
|
async close() {
|
|
if (!this.isEnabled) {
|
|
return;
|
|
}
|
|
|
|
this.isEnabled = false;
|
|
|
|
return new Promise<void>((resolve, reject) => {
|
|
this.database.close((err) => {
|
|
if (err) {
|
|
reject(`Failed to close sqlite database: ${err.message}`);
|
|
return;
|
|
}
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
async index(network: Network, channel: Chan, msg: Msg) {
|
|
await this.initDone.promise;
|
|
|
|
if (!this.isEnabled) {
|
|
return;
|
|
}
|
|
|
|
const clonedMsg = Object.keys(msg).reduce((newMsg, prop) => {
|
|
// id is regenerated when messages are retrieved
|
|
// previews are not stored because storage is cleared on lounge restart
|
|
// type and time are stored in a separate column
|
|
if (prop !== "id" && prop !== "previews" && prop !== "type" && prop !== "time") {
|
|
newMsg[prop] = msg[prop];
|
|
}
|
|
|
|
return newMsg;
|
|
}, {});
|
|
|
|
await this.serialize_run(
|
|
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)",
|
|
[
|
|
network.uuid,
|
|
channel.name.toLowerCase(),
|
|
msg.time.getTime(),
|
|
msg.type,
|
|
JSON.stringify(clonedMsg),
|
|
]
|
|
);
|
|
}
|
|
|
|
async deleteChannel(network: Network, channel: Channel) {
|
|
await this.initDone.promise;
|
|
|
|
if (!this.isEnabled) {
|
|
return;
|
|
}
|
|
|
|
await this.serialize_run("DELETE FROM messages WHERE network = ? AND channel = ?", [
|
|
network.uuid,
|
|
channel.name.toLowerCase(),
|
|
]);
|
|
}
|
|
|
|
async getMessages(
|
|
network: Network,
|
|
channel: Channel,
|
|
nextID: () => number
|
|
): Promise<Message[]> {
|
|
await this.initDone.promise;
|
|
|
|
if (!this.isEnabled || Config.values.maxHistory === 0) {
|
|
return [];
|
|
}
|
|
|
|
// If unlimited history is specified, load 100k messages
|
|
const limit = Config.values.maxHistory < 0 ? 100000 : Config.values.maxHistory;
|
|
|
|
const rows = await this.serialize_fetchall(
|
|
"SELECT msg, type, time FROM messages WHERE network = ? AND channel = ? ORDER BY time DESC LIMIT ?",
|
|
network.uuid,
|
|
channel.name.toLowerCase(),
|
|
limit
|
|
);
|
|
|
|
return rows.reverse().map((row: any): Message => {
|
|
const msg = JSON.parse(row.msg);
|
|
msg.time = row.time;
|
|
msg.type = row.type;
|
|
|
|
const newMsg = new Msg(msg);
|
|
newMsg.id = nextID();
|
|
|
|
return newMsg;
|
|
});
|
|
}
|
|
|
|
async search(query: SearchQuery): Promise<SearchResponse> {
|
|
await this.initDone.promise;
|
|
|
|
if (!this.isEnabled) {
|
|
// this should never be hit as messageProvider is checked in client.search()
|
|
throw new Error(
|
|
"search called but sqlite provider not enabled. This is a programming error"
|
|
);
|
|
}
|
|
|
|
// Using the '@' character to escape '%' and '_' in patterns.
|
|
const escapedSearchTerm = query.searchTerm.replace(/([%_@])/g, "@$1");
|
|
|
|
let select =
|
|
'SELECT msg, type, time, network, channel FROM messages WHERE type = "message" AND json_extract(msg, "$.text") LIKE ? ESCAPE \'@\'';
|
|
const params: any[] = [`%${escapedSearchTerm}%`];
|
|
|
|
if (query.networkUuid) {
|
|
select += " AND network = ? ";
|
|
params.push(query.networkUuid);
|
|
}
|
|
|
|
if (query.channelName) {
|
|
select += " AND channel = ? ";
|
|
params.push(query.channelName.toLowerCase());
|
|
}
|
|
|
|
const maxResults = 100;
|
|
|
|
select += " ORDER BY time DESC LIMIT ? OFFSET ? ";
|
|
params.push(maxResults);
|
|
params.push(query.offset);
|
|
|
|
const rows = await this.serialize_fetchall(select, ...params);
|
|
return {
|
|
...query,
|
|
results: parseSearchRowsToMessages(query.offset, rows).reverse(),
|
|
};
|
|
}
|
|
|
|
canProvideMessages() {
|
|
return this.isEnabled;
|
|
}
|
|
|
|
private serialize_run(stmt: string, params: any[]): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
this.database.serialize(() => {
|
|
this.database.run(stmt, params, (err) => {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
private serialize_fetchall(stmt: string, ...params: any[]): Promise<any[]> {
|
|
return new Promise((resolve, reject) => {
|
|
this.database.serialize(() => {
|
|
this.database.all(stmt, params, (err, rows) => {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
|
|
resolve(rows);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
private serialize_get(stmt: string, ...params: any[]): Promise<any> {
|
|
return new Promise((resolve, reject) => {
|
|
this.database.serialize(() => {
|
|
this.database.get(stmt, params, (err, row) => {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
|
|
resolve(row);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
// TODO: type any
|
|
function parseSearchRowsToMessages(id: number, rows: any[]) {
|
|
const messages: Msg[] = [];
|
|
|
|
for (const row of rows) {
|
|
const msg = JSON.parse(row.msg);
|
|
msg.time = row.time;
|
|
msg.type = row.type;
|
|
msg.networkUuid = row.network;
|
|
msg.channelName = row.channel;
|
|
msg.id = id;
|
|
messages.push(new Msg(msg));
|
|
id += 1;
|
|
}
|
|
|
|
return messages;
|
|
}
|
|
|
|
export function necessaryMigrations(since: number): Migration[] {
|
|
return migrations.filter((m) => m.version > since);
|
|
}
|
|
|
|
export default SqliteMessageStorage;
|