client: Switch to using go.uber.org/zap for logging.

This commit is contained in:
Joachim Bauch 2024-09-15 19:22:58 +02:00
commit 4771d5d893
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -28,7 +28,6 @@ import (
"flag"
"fmt"
"io"
"log"
pseudorand "math/rand"
"net"
"net/http"
@ -46,6 +45,7 @@ import (
"github.com/gorilla/securecookie"
"github.com/gorilla/websocket"
"github.com/mailru/easyjson"
"go.uber.org/zap"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
@ -81,6 +81,8 @@ const (
)
type Stats struct {
log *zap.Logger
numRecvMessages atomic.Uint64
numSentMessages atomic.Uint64
resetRecvMessages uint64
@ -107,10 +109,13 @@ func (s *Stats) Log() {
sentMessages := totalSentMessages - s.resetSentMessages
totalRecvMessages := s.numRecvMessages.Load()
recvMessages := totalRecvMessages - s.resetRecvMessages
log.Printf("Stats: sent=%d (%d/sec), recv=%d (%d/sec), delta=%d",
totalSentMessages, sentMessages/perSec,
totalRecvMessages, recvMessages/perSec,
totalSentMessages-totalRecvMessages)
s.log.Info("Stats updated",
zap.Uint64("sent", totalSentMessages),
zap.Uint64("sentspeed", sentMessages/perSec),
zap.Uint64("recv", totalRecvMessages),
zap.Uint64("recvspeed", recvMessages/perSec),
zap.Uint64("delta", totalSentMessages-totalRecvMessages),
)
s.reset(now)
}
@ -119,6 +124,7 @@ type MessagePayload struct {
}
type SignalingClient struct {
log *zap.Logger
readyWg *sync.WaitGroup
cookie *securecookie.SecureCookie
@ -135,13 +141,14 @@ type SignalingClient struct {
userId string
}
func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) {
func NewSignalingClient(log *zap.Logger, cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
client := &SignalingClient{
log: log,
readyWg: readyWg,
cookie: cookie,
@ -204,13 +211,19 @@ func (c *SignalingClient) processMessage(message *signaling.ServerMessage) {
case "message":
c.processMessageMessage(message)
case "bye":
log.Printf("Received bye: %+v", message.Bye)
c.log.Error("Received bye",
zap.Any("bye", message.Bye),
)
c.Close()
case "error":
log.Printf("Received error: %+v", message.Error)
c.log.Error("Received error",
zap.Any("error", message.Error),
)
c.Close()
default:
log.Printf("Unsupported message type: %+v", *message)
c.log.Warn("Unsupported message type",
zap.Stringer("message", message),
)
}
}
@ -236,7 +249,10 @@ func (c *SignalingClient) processHelloMessage(message *signaling.ServerMessage)
c.privateSessionId = message.Hello.ResumeId
c.publicSessionId = c.privateToPublicSessionId(c.privateSessionId)
c.userId = message.Hello.UserId
log.Printf("Registered as %s (userid %s)", c.privateSessionId, c.userId)
c.log.Info("Registered",
zap.String("privateid", c.privateSessionId),
zap.String("userid", c.userId),
)
c.readyWg.Done()
}
@ -249,14 +265,18 @@ func (c *SignalingClient) PublicSessionId() string {
func (c *SignalingClient) processMessageMessage(message *signaling.ServerMessage) {
var msg MessagePayload
if err := json.Unmarshal(message.Message.Data, &msg); err != nil {
log.Println("Error in unmarshal", err)
c.log.Error("Error in unmarshal",
zap.Error(err),
)
return
}
now := time.Now()
duration := now.Sub(msg.Now)
if duration > messageReportDuration {
log.Printf("Message took %s", duration)
c.log.Warn("Message took too long",
zap.Duration("duration", duration),
)
}
}
@ -283,13 +303,17 @@ func (c *SignalingClient) readPump() {
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error: %v", err)
c.log.Error("Error reading",
zap.Error(err),
)
}
break
}
if messageType != websocket.TextMessage {
log.Println("Unsupported message type", messageType)
c.log.Error("Unsupported message type",
zap.Int("type", messageType),
)
break
}
@ -297,7 +321,9 @@ func (c *SignalingClient) readPump() {
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
c.lock.Lock()
if c.conn != nil {
log.Println("Error reading message", err)
c.log.Error("Error reading message",
zap.Error(err),
)
}
c.lock.Unlock()
break
@ -305,7 +331,9 @@ func (c *SignalingClient) readPump() {
var message signaling.ServerMessage
if err := message.UnmarshalJSON(decodeBuffer.Bytes()); err != nil {
log.Printf("Error: %v", err)
c.log.Error("Error unmarshalling",
zap.Error(err),
)
break
}
@ -327,7 +355,10 @@ func (c *SignalingClient) writeInternal(message *signaling.ClientMessage) bool {
return false
}
log.Println("Could not send message", message, err)
c.log.Error("Could not send message",
zap.Stringer("message", message),
zap.Error(err),
)
// TODO(jojo): Differentiate between JSON encode errors and websocket errors.
closeData = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "")
goto close
@ -413,29 +444,33 @@ func (c *SignalingClient) SendMessages(clients []*SignalingClient) {
}
}
func registerAuthHandler(router *mux.Router) {
func registerAuthHandler(log *zap.Logger, router *mux.Router) {
router.HandleFunc("/auth", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.Println("Error reading body:", err)
log.Error("Error reading body",
zap.Error(err),
)
return
}
rnd := r.Header.Get(signaling.HeaderBackendSignalingRandom)
checksum := r.Header.Get(signaling.HeaderBackendSignalingChecksum)
if rnd == "" || checksum == "" {
log.Println("No checksum headers found")
log.Error("No checksum headers found")
return
}
if verify := signaling.CalculateBackendChecksum(rnd, body, backendSecret); verify != checksum {
log.Println("Backend checksum verification failed")
log.Error("Backend checksum verification failed")
return
}
var request signaling.BackendClientRequest
if err := request.UnmarshalJSON(body); err != nil {
log.Println(err)
log.Error("Error unmarshalling",
zap.Error(err),
)
return
}
@ -449,7 +484,9 @@ func registerAuthHandler(router *mux.Router) {
data, err := response.MarshalJSON()
if err != nil {
log.Println(err)
log.Error("Error marshalling response message",
zap.Error(err),
)
return
}
@ -467,7 +504,9 @@ func registerAuthHandler(router *mux.Router) {
jsonpayload, err := payload.MarshalJSON()
if err != nil {
log.Println(err)
log.Error("Error marshalling payload",
zap.Error(err),
)
return
}
@ -477,10 +516,12 @@ func registerAuthHandler(router *mux.Router) {
})
}
func getLocalIP() string {
func getLocalIP(log *zap.Logger) string {
interfaces, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
log.Fatal("Error getting interfaces",
zap.Error(err),
)
}
for _, intf := range interfaces {
switch t := intf.(type) {
@ -508,11 +549,14 @@ func reverseSessionId(s string) (string, error) {
func main() {
flag.Parse()
log.SetFlags(0)
log := zap.Must(zap.NewDevelopment())
config, err := goconf.ReadConfigFile(*config)
if err != nil {
log.Fatal("Could not read configuration: ", err)
log.Fatal("Could not read configuration",
zap.Error(err),
)
}
secret, _ := config.GetString("backend", "secret")
@ -523,7 +567,9 @@ func main() {
case 32:
case 64:
default:
log.Printf("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes", len(hashKey))
log.Warn("The sessions hash key should be 32 or 64 bytes",
zap.Int("len", len(hashKey)),
)
}
blockKey, _ := config.GetString("sessions", "blockkey")
@ -535,24 +581,30 @@ func main() {
case 24:
case 32:
default:
log.Fatalf("The sessions block key must be 16, 24 or 32 bytes but is %d bytes", len(blockKey))
log.Fatal("The sessions block key must be 16, 24 or 32 bytes",
zap.Int("len", len(blockKey)),
)
}
cookie := securecookie.New([]byte(hashKey), blockBytes).MaxAge(0)
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs", cpus)
log.Debug("Using number of CPUs",
zap.Int("cpus", cpus),
)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
r := mux.NewRouter()
registerAuthHandler(r)
registerAuthHandler(log, r)
localIP := getLocalIP()
localIP := getLocalIP(log)
listener, err := net.Listen("tcp", localIP+":0")
if err != nil {
log.Fatal(err)
log.Fatal("Error starting listener",
zap.Error(err),
)
}
server := http.Server{
@ -562,7 +614,9 @@ func main() {
server.Serve(listener) // nolint
}()
backendUrl := "http://" + listener.Addr().String()
log.Println("Backend server running on", backendUrl)
log.Info("Backend server running",
zap.String("url", backendUrl),
)
urls := make([]url.URL, 0)
urlstrings := make([]string, 0)
@ -575,24 +629,34 @@ func main() {
urls = append(urls, u)
urlstrings = append(urlstrings, u.String())
}
log.Printf("Connecting to %s", urlstrings)
log.Info("Connecting",
zap.Strings("urls", urlstrings),
)
clients := make([]*SignalingClient, 0)
stats := &Stats{}
if *maxClients < 2 {
log.Fatalf("Need at least 2 clients, got %d", *maxClients)
stats := &Stats{
log: log,
}
log.Printf("Starting %d clients", *maxClients)
if *maxClients < 2 {
log.Fatal("Need at least 2 clients",
zap.Int("count", *maxClients),
)
}
log.Info("Starting clients",
zap.Int("count", *maxClients),
)
var doneWg sync.WaitGroup
var readyWg sync.WaitGroup
for i := 0; i < *maxClients; i++ {
client, err := NewSignalingClient(cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg)
client, err := NewSignalingClient(log, cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg)
if err != nil {
log.Fatal(err)
log.Fatal("Error creating signaling client",
zap.Error(err),
)
}
defer client.Close()
readyWg.Add(1)
@ -612,10 +676,10 @@ func main() {
clients = append(clients, client)
}
log.Println("Clients created")
log.Info("Clients created")
readyWg.Wait()
log.Println("All connections established")
log.Info("All connections established")
for _, c := range clients {
doneWg.Add(1)
@ -632,14 +696,14 @@ loop:
for {
select {
case <-interrupt:
log.Println("Interrupted")
log.Info("Interrupted")
break loop
case <-report.C:
stats.Log()
}
}
log.Println("Waiting for clients to terminate ...")
log.Info("Waiting for clients to terminate ...")
for _, c := range clients {
c.Close()
}