adding models / services

This commit is contained in:
forest 2023-01-06 18:13:51 -06:00
parent b1553165c0
commit 0699f8e01e
9 changed files with 432 additions and 13 deletions

View file

@ -1,4 +1,6 @@
{
"MatrixURL": "http://localhost:8080",
"MatrixAdminToken": "changeme",
"DatabaseType": "postgres",
"DatabaseConnectionString": "host=localhost port=5432 user=synapse_user password=changeme database=synapse sslmode=disable"
}

View file

@ -2,6 +2,7 @@ package main
import (
"database/sql"
"fmt"
"log"
errors "git.sequentialread.com/forest/pkg-errors"
@ -25,6 +26,18 @@ type StateGroupsStateRow struct {
EventId string
}
type DeleteStateGroupsStateStatus struct {
StateGroupsDeleted int64
RowsDeleted int64
Errors int64
}
type DBTableSize struct {
Schema string
Name string
Bytes int64
}
func initDatabase(config *Config) *DBModel {
db, err := sql.Open(config.DatabaseType, config.DatabaseConnectionString)
@ -56,7 +69,7 @@ func (model *DBModel) StateGroupsStateStream() (*StateGroupsStateStream, error)
}
toReturn := StateGroupsStateStream{
EstimatedCount: estimatedCount,
Channel: make(chan StateGroupsStateRow, 10000),
Channel: make(chan StateGroupsStateRow, 50000),
}
go func(rows *sql.Rows, channel chan StateGroupsStateRow) {
@ -86,3 +99,141 @@ func (model *DBModel) StateGroupsStateStream() (*StateGroupsStateStream, error)
return &toReturn, nil
}
func (model *DBModel) GetStateGroupsForRoom(roomId string) (stateGroupIds []int64, err error) {
rows, err := model.DB.Query("SELECT id from state_groups where room_id = %s", roomId)
if err != nil {
return nil, errors.Wrap(err, "could not select from state_groups by room_id")
}
stateGroupIds = []int64{}
for rows.Next() {
var stateGroupId int64
err := rows.Scan(&stateGroupId)
if err != nil {
log.Printf("error scanning a state_group id: %s \n", err)
} else {
stateGroupIds = append(stateGroupIds, stateGroupId)
}
}
return stateGroupIds, nil
}
func (model *DBModel) DeleteStateGroupsForRoom(roomId string) (int64, error) {
rowsDeleted := int64(0)
// state_group_edges
result, err := model.DB.Exec(
"DELETE FROM state_group_edges where state_group in (SELECT id from state_groups where room_id = %s);", roomId,
)
if err != nil {
return -1, errors.Wrap(err, "could not delete state_group_edges by room_id")
}
affected, err := result.RowsAffected()
if err != nil {
return -1, errors.Wrap(err, "could not get # of rows affected for delete state_group_edges by room_id")
}
rowsDeleted += int64(affected)
// event_to_state_groups
result, err = model.DB.Exec(
"DELETE FROM event_to_state_groups where state_group in (SELECT id from state_groups where room_id = %s);", roomId,
)
if err != nil {
return -1, errors.Wrap(err, "could not delete event_to_state_groups by room_id")
}
affected, err = result.RowsAffected()
if err != nil {
return -1, errors.Wrap(err, "could not get # of rows affected for delete event_to_state_groups by room_id")
}
rowsDeleted += int64(affected)
// state_groups
result, err = model.DB.Exec(
"DELETE FROM state_groups where room_id = %s;", roomId,
)
if err != nil {
return -1, errors.Wrap(err, "could not delete state_groups by room_id")
}
affected, err = result.RowsAffected()
if err != nil {
return -1, errors.Wrap(err, "could not get # of rows affected for delete state_groups by room_id")
}
rowsDeleted += int64(affected)
return rowsDeleted, nil
}
// TODO this maybe should be parallelized to reduce back-and-forth? (latency-to-db issues)
// But operating on a sorted list of stateGroup IDs seems to work pretty well
// I'll leave it as-is for now
func (model *DBModel) DeleteStateGroupsState(stateGroupIds []int64, startAt int) chan DeleteStateGroupsStateStatus {
toReturn := make(chan DeleteStateGroupsStateStatus, 100)
go func(stateGroupIds []int64, startAt int, channel chan DeleteStateGroupsStateStatus) {
var rowsDeleted int64 = 0
var errorCount int64 = 0
for i := startAt; i < len(stateGroupIds); i++ {
result, err := model.DB.Exec(
"DELETE FROM state_groups_state where state_group = %s;", stateGroupIds[i],
)
if err != nil {
fmt.Println(errors.Wrap(err, "could not delete from state_groups_state by state_group"))
errorCount += 1
continue
}
affected, err := result.RowsAffected()
if err != nil {
fmt.Println(errors.Wrap(err, "could not get # of rows affected for delete from state_groups_state by state_group"))
errorCount += 1
continue
}
rowsDeleted += affected
channel <- DeleteStateGroupsStateStatus{
StateGroupsDeleted: int64(i) + 1,
RowsDeleted: rowsDeleted,
Errors: errorCount,
}
}
}(stateGroupIds, startAt, toReturn)
return toReturn
}
// https://dataedo.com/kb/query/postgresql/list-of-tables-by-their-size
func (model *DBModel) GetDBTableSizes(roomId string) (tables []DBTableSize, err error) {
rows, err := model.DB.Query(
`select schemaname as table_schema, relname as table_name, pg_relation_size(relid) as data_size
from pg_catalog.pg_statio_user_tables
`,
)
if err != nil {
return nil, errors.Wrap(err, "could not get table sizes as bytes")
}
tables = []DBTableSize{}
for rows.Next() {
var schema string
var name string
var bytez int64
err := rows.Scan(&schema, &name, &bytez)
if err != nil {
log.Printf("error scanning a table size row: %s \n", err)
} else {
tables = append(tables, DBTableSize{
Schema: schema,
Name: name,
Bytes: bytez,
})
}
}
return tables, nil
}

64
disk_space.go Normal file
View file

@ -0,0 +1,64 @@
package main
import (
"os"
"path/filepath"
"golang.org/x/sys/unix"
)
func GetAvaliableDiskSpace(path string) (int64, int64, error) {
var stat unix.Statfs_t
err := unix.Statfs(path, &stat)
if err != nil {
return -1, -1, err
}
// Available blocks * size per block = available space in bytes
return int64(stat.Bavail * uint64(stat.Bsize)), int64(stat.Blocks * uint64(stat.Bsize)), nil
}
func GetTotalFilesizeWithinFolder(path string) (int64, error) {
info, err := os.Lstat(path)
if err != nil {
return -1, err
}
return getTotalFilesizeWithinFolderRecurse(path, info)
}
func getTotalFilesizeWithinFolderRecurse(currentPath string, info os.FileInfo) (int64, error) {
size := info.Size()
if info.Mode().IsRegular() {
return size, nil
}
if info.IsDir() {
dir, err := os.Open(currentPath)
if err != nil {
return -1, err
}
defer dir.Close()
fis, err := dir.Readdir(-1)
if err != nil {
return -1, err
}
for _, fi := range fis {
if fi.Name() != "." && fi.Name() != ".." {
continue
}
subfolderSize, err := getTotalFilesizeWithinFolderRecurse(filepath.Join(currentPath, fi.Name()), fi)
if err != nil {
return -1, err
} else {
size += subfolderSize
}
}
}
return size, nil
}

2
frontend.go Normal file
View file

@ -0,0 +1,2 @@

1
go.mod
View file

@ -7,4 +7,5 @@ require (
git.sequentialread.com/forest/pkg-errors v0.9.2 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c // indirect
golang.org/x/sys v0.4.0 // indirect
)

2
go.sum
View file

@ -6,3 +6,5 @@ github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c h1:HelZ2kAFadG0La9d+4htN4HzQ68Bm2iM9qKMSMES6xg=
github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c/go.mod h1:JlzghshsemAMDGZLytTFY8C1JQxQPhnatWqNwUXjggo=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

11
main.go
View file

@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"log"
"os"
"reflect"
"time"
@ -10,6 +11,8 @@ import (
)
type Config struct {
MatrixURL string
MatrixAdminToken string
DatabaseType string
DatabaseConnectionString string
}
@ -50,9 +53,13 @@ func main() {
output, err := json.MarshalIndent(rowCountByRoom, "", " ")
if err != nil {
log.Fatalf("Can't display output because json.MarshalIndent returned %+v\n", err)
log.Printf("Can't save rooms.json because json.MarshalIndent returned %+v\n", err)
}
log.Println(string(output))
err = os.WriteFile("./rooms.json", output, 0755)
if err != nil {
log.Printf("Can't save rooms.json because os.WriteFile returned %+v\n", err)
}
}

189
matrix_admin_service.go Normal file
View file

@ -0,0 +1,189 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
errors "git.sequentialread.com/forest/pkg-errors"
)
type MatrixAdmin struct {
Client http.Client
URL string
Token string
}
type DeleteRoomRequest struct {
Block bool `json:"block"`
ForcePurge bool `json:"force_purge"`
Purge bool `json:"purge"`
Message string `json:"message"`
}
type DeleteRoomResponse struct {
DeleteId string `json:"delete_id"`
}
type RoomDeletionStatusResponse struct {
Results []RoomDeletionStatus `json:"results"`
}
type RoomDeletionStatus struct {
DeleteId string `json:"delete_id"`
Status string `json:"status"`
Error string `json:"error"`
ShutdownRoom ShutdownRoom `json:"shutdown_room"`
}
type ShutdownRoom struct {
KickedUsers []string `json:"kicked_users"`
FailedToKickUsers []string `json:"failed_to_kick_users"`
LocalAliases []string `json:"local_aliases"`
NewRoomId string `json:"new_room_id"`
}
func initMatrixAdmin(config *Config) *MatrixAdmin {
return &MatrixAdmin{
Client: http.Client{
Timeout: 10 * time.Second,
},
URL: config.MatrixURL,
Token: config.MatrixAdminToken,
}
}
// curl -H "Content-Type: application/json" -X DELETE "localhost:8008/_synapse/admin/v2/rooms/$roomid?access_token=xxxxxxxxx" \
// --data '{ "block": false, "force_purge": true, "purge": true, "message": "This room is being cleaned, stand by..." }'
func (admin *MatrixAdmin) DeleteRoom(roomId string) error {
deleteRequestBodyObject := DeleteRoomRequest{
Block: false,
ForcePurge: true,
Purge: true,
Message: "This room is being cleaned, stand by...",
}
deleteRequestBody, err := json.Marshal(deleteRequestBodyObject)
if err != nil {
return errors.Wrapf(err, "matrixAdmin.DeleteRoom('%s') cannot serialize deleteRequestBodyObject as JSON", roomId)
}
deleteURLWithoutToken := fmt.Sprintf("%s/_synapse/admin/v2/rooms/%s?access_token=", admin.URL, roomId)
deleteURL := fmt.Sprintf("%s%s", deleteURLWithoutToken, admin.Token)
deleteRequest, err := http.NewRequest("DELETE", deleteURL, bytes.NewBuffer(deleteRequestBody))
if err != nil {
return errors.Wrapf(err, "matrixAdmin.DeleteRoom('%s') cannot create deleteRequest", roomId)
}
deleteResponse, err := admin.Client.Do(deleteRequest)
if err != nil {
return errors.New(fmt.Sprintf(
"HTTP DELETE %sxxxxxxx: %s",
deleteURLWithoutToken, err.Error(),
))
}
if deleteResponse.StatusCode >= 300 {
responseBodyString := "read error"
responseBody, err := ioutil.ReadAll(deleteResponse.Body)
if err == nil {
responseBodyString = string(responseBody)
}
return errors.New(fmt.Sprintf(
"HTTP DELETE %sxxxxxxx: HTTP %d: %s",
deleteURLWithoutToken, deleteResponse.StatusCode, responseBodyString,
))
}
return nil
}
func (admin *MatrixAdmin) GetDeleteRoomStatus(roomId string) (string, []string, error) {
statusURLWithoutToken := fmt.Sprintf("%s/_synapse/admin/v2/rooms/%s/delete_status?access_token=", admin.URL, roomId)
statusURL := fmt.Sprintf("%s%s", statusURLWithoutToken, admin.Token)
statusResponse, err := admin.Client.Get(statusURL)
if err != nil {
return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: %s", statusURLWithoutToken, err.Error()))
}
if statusResponse.StatusCode >= 300 {
responseBodyString := "read error"
responseBody, err := ioutil.ReadAll(statusResponse.Body)
if err == nil {
responseBodyString = string(responseBody)
}
return "", nil, errors.New(fmt.Sprintf(
"HTTP GET %sxxxxxxx: HTTP %d: %s",
statusURLWithoutToken, statusResponse.StatusCode, responseBodyString,
))
}
responseBody, err := ioutil.ReadAll(statusResponse.Body)
if err != nil {
return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: read error: %s", statusURLWithoutToken, err.Error()))
}
var responseObject RoomDeletionStatusResponse
err = json.Unmarshal(responseBody, &responseObject)
if err != nil {
return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: json parse error: %s", statusURLWithoutToken, err.Error()))
}
users := map[string]bool{}
errorsSet := map[string]bool{}
mostCompleteStatus := ""
for _, result := range responseObject.Results {
for _, userid := range result.ShutdownRoom.KickedUsers {
users[userid] = true
}
for _, userid := range result.ShutdownRoom.FailedToKickUsers {
users[userid] = true
}
if result.Error != "" {
errorsSet[result.Error] = true
}
if result.Status == "shutting_down" {
if mostCompleteStatus == "" {
mostCompleteStatus = result.Status
}
} else if result.Status == "purging" {
if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" {
mostCompleteStatus = result.Status
}
} else if result.Status == "failed" {
if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" || mostCompleteStatus == "purging" {
mostCompleteStatus = result.Status
}
} else if result.Status == "complete" {
if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" || mostCompleteStatus == "purging" || mostCompleteStatus == "failed" {
mostCompleteStatus = result.Status
}
}
}
usersSlice := []string{}
for userid := range users {
usersSlice = append(usersSlice, userid)
}
if mostCompleteStatus == "failed" {
errorsSlice := []string{}
for errString := range errorsSet {
errorsSlice = append(errorsSlice, errString)
}
return "", nil, errors.New(fmt.Sprintf("room deletion failed: \n%s", strings.Join(errorsSlice, "\n")))
}
return mostCompleteStatus, usersSlice, nil
}

View file

@ -121,7 +121,8 @@
const row = rowString.split("|").map(raw => raw.trim());
return {
roomid: row[0].slice(15, row[0].length),
//roomid: row[0].slice(15, row[0].length),
roomid: row[0],
numRows: Number(row[1]),
}
});
@ -130,15 +131,15 @@
return b.numRows - a.numRows;
});
// // this was pulled from tablesize-chartjs.html row count for state_groups_state
// const totalRowCount = 1104920600;
// this was pulled from tablesize-chartjs.html row count for state_groups_state
const totalRowCount = 1162750829;
// const otherRoomsRowCount = totalRowCount - rooms.reduce((accumulator, room) => accumulator + room.numRows, 0)
const otherRoomsRowCount = totalRowCount - rooms.reduce((accumulator, room) => accumulator + room.numRows, 0)
// rooms.push({
// roomid: "others",
// numRows: otherRoomsRowCount
// })
rooms.push({
roomid: "others",
numRows: otherRoomsRowCount
})
new Chart(ctx, {
@ -148,8 +149,8 @@
datasets: [{
label: 'filesize %',
//data: rooms.map(room => Math.round((room.numRows/totalRowCount)*100)),
data: rooms.map(room => room.numRows),
data: rooms.map(room => (room.numRows/totalRowCount)*100),
//data: rooms.map(room => room.numRows),
borderWidth: 2
}]
},