diff --git a/config.json b/config.json index 44f4048..a747fbb 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,6 @@ { + "MatrixURL": "http://localhost:8080", + "MatrixAdminToken": "changeme", "DatabaseType": "postgres", "DatabaseConnectionString": "host=localhost port=5432 user=synapse_user password=changeme database=synapse sslmode=disable" } \ No newline at end of file diff --git a/db_model.go b/db_model.go index 859d739..a8e3347 100644 --- a/db_model.go +++ b/db_model.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "fmt" "log" errors "git.sequentialread.com/forest/pkg-errors" @@ -25,6 +26,18 @@ type StateGroupsStateRow struct { EventId string } +type DeleteStateGroupsStateStatus struct { + StateGroupsDeleted int64 + RowsDeleted int64 + Errors int64 +} + +type DBTableSize struct { + Schema string + Name string + Bytes int64 +} + func initDatabase(config *Config) *DBModel { db, err := sql.Open(config.DatabaseType, config.DatabaseConnectionString) @@ -56,7 +69,7 @@ func (model *DBModel) StateGroupsStateStream() (*StateGroupsStateStream, error) } toReturn := StateGroupsStateStream{ EstimatedCount: estimatedCount, - Channel: make(chan StateGroupsStateRow, 10000), + Channel: make(chan StateGroupsStateRow, 50000), } go func(rows *sql.Rows, channel chan StateGroupsStateRow) { @@ -86,3 +99,141 @@ func (model *DBModel) StateGroupsStateStream() (*StateGroupsStateStream, error) return &toReturn, nil } + +func (model *DBModel) GetStateGroupsForRoom(roomId string) (stateGroupIds []int64, err error) { + + rows, err := model.DB.Query("SELECT id from state_groups where room_id = %s", roomId) + if err != nil { + return nil, errors.Wrap(err, "could not select from state_groups by room_id") + } + + stateGroupIds = []int64{} + for rows.Next() { + var stateGroupId int64 + + err := rows.Scan(&stateGroupId) + if err != nil { + log.Printf("error scanning a state_group id: %s \n", err) + } else { + stateGroupIds = append(stateGroupIds, stateGroupId) + } + } + + return stateGroupIds, nil +} + +func (model *DBModel) DeleteStateGroupsForRoom(roomId string) (int64, error) { + + rowsDeleted := int64(0) + + // state_group_edges + result, err := model.DB.Exec( + "DELETE FROM state_group_edges where state_group in (SELECT id from state_groups where room_id = %s);", roomId, + ) + if err != nil { + return -1, errors.Wrap(err, "could not delete state_group_edges by room_id") + } + affected, err := result.RowsAffected() + if err != nil { + return -1, errors.Wrap(err, "could not get # of rows affected for delete state_group_edges by room_id") + } + rowsDeleted += int64(affected) + + // event_to_state_groups + result, err = model.DB.Exec( + "DELETE FROM event_to_state_groups where state_group in (SELECT id from state_groups where room_id = %s);", roomId, + ) + if err != nil { + return -1, errors.Wrap(err, "could not delete event_to_state_groups by room_id") + } + affected, err = result.RowsAffected() + if err != nil { + return -1, errors.Wrap(err, "could not get # of rows affected for delete event_to_state_groups by room_id") + } + rowsDeleted += int64(affected) + + // state_groups + result, err = model.DB.Exec( + "DELETE FROM state_groups where room_id = %s;", roomId, + ) + if err != nil { + return -1, errors.Wrap(err, "could not delete state_groups by room_id") + } + affected, err = result.RowsAffected() + if err != nil { + return -1, errors.Wrap(err, "could not get # of rows affected for delete state_groups by room_id") + } + rowsDeleted += int64(affected) + + return rowsDeleted, nil +} + +// TODO this maybe should be parallelized to reduce back-and-forth? (latency-to-db issues) +// But operating on a sorted list of stateGroup IDs seems to work pretty well +// I'll leave it as-is for now +func (model *DBModel) DeleteStateGroupsState(stateGroupIds []int64, startAt int) chan DeleteStateGroupsStateStatus { + + toReturn := make(chan DeleteStateGroupsStateStatus, 100) + + go func(stateGroupIds []int64, startAt int, channel chan DeleteStateGroupsStateStatus) { + var rowsDeleted int64 = 0 + var errorCount int64 = 0 + for i := startAt; i < len(stateGroupIds); i++ { + result, err := model.DB.Exec( + "DELETE FROM state_groups_state where state_group = %s;", stateGroupIds[i], + ) + if err != nil { + fmt.Println(errors.Wrap(err, "could not delete from state_groups_state by state_group")) + errorCount += 1 + continue + } + affected, err := result.RowsAffected() + if err != nil { + fmt.Println(errors.Wrap(err, "could not get # of rows affected for delete from state_groups_state by state_group")) + errorCount += 1 + continue + } + rowsDeleted += affected + channel <- DeleteStateGroupsStateStatus{ + StateGroupsDeleted: int64(i) + 1, + RowsDeleted: rowsDeleted, + Errors: errorCount, + } + } + }(stateGroupIds, startAt, toReturn) + + return toReturn +} + +// https://dataedo.com/kb/query/postgresql/list-of-tables-by-their-size +func (model *DBModel) GetDBTableSizes(roomId string) (tables []DBTableSize, err error) { + + rows, err := model.DB.Query( + `select schemaname as table_schema, relname as table_name, pg_relation_size(relid) as data_size + from pg_catalog.pg_statio_user_tables + `, + ) + if err != nil { + return nil, errors.Wrap(err, "could not get table sizes as bytes") + } + + tables = []DBTableSize{} + for rows.Next() { + var schema string + var name string + var bytez int64 + + err := rows.Scan(&schema, &name, &bytez) + if err != nil { + log.Printf("error scanning a table size row: %s \n", err) + } else { + tables = append(tables, DBTableSize{ + Schema: schema, + Name: name, + Bytes: bytez, + }) + } + } + + return tables, nil +} diff --git a/disk_space.go b/disk_space.go new file mode 100644 index 0000000..7448a35 --- /dev/null +++ b/disk_space.go @@ -0,0 +1,64 @@ +package main + +import ( + "os" + "path/filepath" + + "golang.org/x/sys/unix" +) + +func GetAvaliableDiskSpace(path string) (int64, int64, error) { + + var stat unix.Statfs_t + + err := unix.Statfs(path, &stat) + + if err != nil { + return -1, -1, err + } + + // Available blocks * size per block = available space in bytes + return int64(stat.Bavail * uint64(stat.Bsize)), int64(stat.Blocks * uint64(stat.Bsize)), nil +} + +func GetTotalFilesizeWithinFolder(path string) (int64, error) { + info, err := os.Lstat(path) + if err != nil { + return -1, err + } + return getTotalFilesizeWithinFolderRecurse(path, info) +} + +func getTotalFilesizeWithinFolderRecurse(currentPath string, info os.FileInfo) (int64, error) { + + size := info.Size() + if info.Mode().IsRegular() { + return size, nil + } + + if info.IsDir() { + dir, err := os.Open(currentPath) + if err != nil { + return -1, err + } + defer dir.Close() + + fis, err := dir.Readdir(-1) + if err != nil { + return -1, err + } + for _, fi := range fis { + if fi.Name() != "." && fi.Name() != ".." { + continue + } + subfolderSize, err := getTotalFilesizeWithinFolderRecurse(filepath.Join(currentPath, fi.Name()), fi) + if err != nil { + return -1, err + } else { + size += subfolderSize + } + } + } + + return size, nil +} diff --git a/frontend.go b/frontend.go new file mode 100644 index 0000000..139597f --- /dev/null +++ b/frontend.go @@ -0,0 +1,2 @@ + + diff --git a/go.mod b/go.mod index 1fbd9a5..791c9a9 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,5 @@ require ( git.sequentialread.com/forest/pkg-errors v0.9.2 // indirect github.com/lib/pq v1.10.7 // indirect github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c // indirect + golang.org/x/sys v0.4.0 // indirect ) diff --git a/go.sum b/go.sum index 500ba4c..5ef6c8e 100644 --- a/go.sum +++ b/go.sum @@ -6,3 +6,5 @@ github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c h1:HelZ2kAFadG0La9d+4htN4HzQ68Bm2iM9qKMSMES6xg= github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c/go.mod h1:JlzghshsemAMDGZLytTFY8C1JQxQPhnatWqNwUXjggo= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index 897ff48..cb131f4 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "log" + "os" "reflect" "time" @@ -10,6 +11,8 @@ import ( ) type Config struct { + MatrixURL string + MatrixAdminToken string DatabaseType string DatabaseConnectionString string } @@ -50,9 +53,13 @@ func main() { output, err := json.MarshalIndent(rowCountByRoom, "", " ") if err != nil { - log.Fatalf("Can't display output because json.MarshalIndent returned %+v\n", err) + log.Printf("Can't save rooms.json because json.MarshalIndent returned %+v\n", err) } - log.Println(string(output)) + err = os.WriteFile("./rooms.json", output, 0755) + + if err != nil { + log.Printf("Can't save rooms.json because os.WriteFile returned %+v\n", err) + } } diff --git a/matrix_admin_service.go b/matrix_admin_service.go new file mode 100644 index 0000000..0076d2d --- /dev/null +++ b/matrix_admin_service.go @@ -0,0 +1,189 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + errors "git.sequentialread.com/forest/pkg-errors" +) + +type MatrixAdmin struct { + Client http.Client + URL string + Token string +} + +type DeleteRoomRequest struct { + Block bool `json:"block"` + ForcePurge bool `json:"force_purge"` + Purge bool `json:"purge"` + Message string `json:"message"` +} + +type DeleteRoomResponse struct { + DeleteId string `json:"delete_id"` +} + +type RoomDeletionStatusResponse struct { + Results []RoomDeletionStatus `json:"results"` +} +type RoomDeletionStatus struct { + DeleteId string `json:"delete_id"` + Status string `json:"status"` + Error string `json:"error"` + ShutdownRoom ShutdownRoom `json:"shutdown_room"` +} +type ShutdownRoom struct { + KickedUsers []string `json:"kicked_users"` + FailedToKickUsers []string `json:"failed_to_kick_users"` + LocalAliases []string `json:"local_aliases"` + NewRoomId string `json:"new_room_id"` +} + +func initMatrixAdmin(config *Config) *MatrixAdmin { + + return &MatrixAdmin{ + Client: http.Client{ + Timeout: 10 * time.Second, + }, + URL: config.MatrixURL, + Token: config.MatrixAdminToken, + } +} + +// curl -H "Content-Type: application/json" -X DELETE "localhost:8008/_synapse/admin/v2/rooms/$roomid?access_token=xxxxxxxxx" \ +// --data '{ "block": false, "force_purge": true, "purge": true, "message": "This room is being cleaned, stand by..." }' + +func (admin *MatrixAdmin) DeleteRoom(roomId string) error { + + deleteRequestBodyObject := DeleteRoomRequest{ + Block: false, + ForcePurge: true, + Purge: true, + Message: "This room is being cleaned, stand by...", + } + deleteRequestBody, err := json.Marshal(deleteRequestBodyObject) + if err != nil { + return errors.Wrapf(err, "matrixAdmin.DeleteRoom('%s') cannot serialize deleteRequestBodyObject as JSON", roomId) + } + deleteURLWithoutToken := fmt.Sprintf("%s/_synapse/admin/v2/rooms/%s?access_token=", admin.URL, roomId) + deleteURL := fmt.Sprintf("%s%s", deleteURLWithoutToken, admin.Token) + deleteRequest, err := http.NewRequest("DELETE", deleteURL, bytes.NewBuffer(deleteRequestBody)) + + if err != nil { + return errors.Wrapf(err, "matrixAdmin.DeleteRoom('%s') cannot create deleteRequest", roomId) + } + + deleteResponse, err := admin.Client.Do(deleteRequest) + + if err != nil { + return errors.New(fmt.Sprintf( + "HTTP DELETE %sxxxxxxx: %s", + deleteURLWithoutToken, err.Error(), + )) + } + + if deleteResponse.StatusCode >= 300 { + responseBodyString := "read error" + responseBody, err := ioutil.ReadAll(deleteResponse.Body) + if err == nil { + responseBodyString = string(responseBody) + } + + return errors.New(fmt.Sprintf( + "HTTP DELETE %sxxxxxxx: HTTP %d: %s", + deleteURLWithoutToken, deleteResponse.StatusCode, responseBodyString, + )) + } + + return nil +} + +func (admin *MatrixAdmin) GetDeleteRoomStatus(roomId string) (string, []string, error) { + + statusURLWithoutToken := fmt.Sprintf("%s/_synapse/admin/v2/rooms/%s/delete_status?access_token=", admin.URL, roomId) + statusURL := fmt.Sprintf("%s%s", statusURLWithoutToken, admin.Token) + + statusResponse, err := admin.Client.Get(statusURL) + + if err != nil { + return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: %s", statusURLWithoutToken, err.Error())) + } + + if statusResponse.StatusCode >= 300 { + responseBodyString := "read error" + responseBody, err := ioutil.ReadAll(statusResponse.Body) + if err == nil { + responseBodyString = string(responseBody) + } + + return "", nil, errors.New(fmt.Sprintf( + "HTTP GET %sxxxxxxx: HTTP %d: %s", + statusURLWithoutToken, statusResponse.StatusCode, responseBodyString, + )) + } + + responseBody, err := ioutil.ReadAll(statusResponse.Body) + if err != nil { + return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: read error: %s", statusURLWithoutToken, err.Error())) + } + var responseObject RoomDeletionStatusResponse + err = json.Unmarshal(responseBody, &responseObject) + if err != nil { + return "", nil, errors.New(fmt.Sprintf("HTTP GET %sxxxxxxx: json parse error: %s", statusURLWithoutToken, err.Error())) + } + + users := map[string]bool{} + errorsSet := map[string]bool{} + mostCompleteStatus := "" + + for _, result := range responseObject.Results { + for _, userid := range result.ShutdownRoom.KickedUsers { + users[userid] = true + } + for _, userid := range result.ShutdownRoom.FailedToKickUsers { + users[userid] = true + } + if result.Error != "" { + errorsSet[result.Error] = true + } + if result.Status == "shutting_down" { + if mostCompleteStatus == "" { + mostCompleteStatus = result.Status + } + } else if result.Status == "purging" { + if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" { + mostCompleteStatus = result.Status + } + } else if result.Status == "failed" { + if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" || mostCompleteStatus == "purging" { + mostCompleteStatus = result.Status + } + } else if result.Status == "complete" { + if mostCompleteStatus == "" || mostCompleteStatus == "shutting_down" || mostCompleteStatus == "purging" || mostCompleteStatus == "failed" { + mostCompleteStatus = result.Status + } + } + + } + usersSlice := []string{} + for userid := range users { + usersSlice = append(usersSlice, userid) + } + + if mostCompleteStatus == "failed" { + errorsSlice := []string{} + for errString := range errorsSet { + errorsSlice = append(errorsSlice, errString) + } + + return "", nil, errors.New(fmt.Sprintf("room deletion failed: \n%s", strings.Join(errorsSlice, "\n"))) + } + + return mostCompleteStatus, usersSlice, nil +} diff --git a/readme/roomsize-chartjs.html b/readme/roomsize-chartjs.html index ed65b7b..7a5ca0e 100644 --- a/readme/roomsize-chartjs.html +++ b/readme/roomsize-chartjs.html @@ -121,7 +121,8 @@ const row = rowString.split("|").map(raw => raw.trim()); return { - roomid: row[0].slice(15, row[0].length), + //roomid: row[0].slice(15, row[0].length), + roomid: row[0], numRows: Number(row[1]), } }); @@ -130,15 +131,15 @@ return b.numRows - a.numRows; }); - // // this was pulled from tablesize-chartjs.html row count for state_groups_state - // const totalRowCount = 1104920600; + // this was pulled from tablesize-chartjs.html row count for state_groups_state + const totalRowCount = 1162750829; - // const otherRoomsRowCount = totalRowCount - rooms.reduce((accumulator, room) => accumulator + room.numRows, 0) + const otherRoomsRowCount = totalRowCount - rooms.reduce((accumulator, room) => accumulator + room.numRows, 0) - // rooms.push({ - // roomid: "others", - // numRows: otherRoomsRowCount - // }) + rooms.push({ + roomid: "others", + numRows: otherRoomsRowCount + }) new Chart(ctx, { @@ -148,8 +149,8 @@ datasets: [{ label: 'filesize %', - //data: rooms.map(room => Math.round((room.numRows/totalRowCount)*100)), - data: rooms.map(room => room.numRows), + data: rooms.map(room => (room.numRows/totalRowCount)*100), + //data: rooms.map(room => room.numRows), borderWidth: 2 }] },