diff --git a/backend_server.go b/backend_server.go index 85804f3..65e99f4 100644 --- a/backend_server.go +++ b/backend_server.go @@ -41,6 +41,7 @@ import ( "github.com/dlintw/goconf" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -153,6 +154,9 @@ func (b *BackendServer) Start(r *mux.Router) error { s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.parseRequestBody(b.roomHandler))).Methods("POST") s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET") + // Expose prometheus metrics at "/metrics". + r.HandleFunc("/metrics", b.setComonHeaders(b.validateStatsRequest(b.metricsHandler))).Methods("GET") + // Provide a REST service to get TURN credentials. // See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 r.HandleFunc("/turn/credentials", b.setComonHeaders(b.getTurnCredentials)).Methods("GET") @@ -632,3 +636,7 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(statsData) // nolint } + +func (b *BackendServer) metricsHandler(w http.ResponseWriter, r *http.Request) { + promhttp.Handler().ServeHTTP(w, r) +} diff --git a/mcu_janus.go b/mcu_janus.go index 1f2c1a9..6278855 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -465,7 +465,7 @@ func (c *mcuJanusClient) Close(ctx context.Context) { func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { } -func (c *mcuJanusClient) closeClient(ctx context.Context) { +func (c *mcuJanusClient) closeClient(ctx context.Context) bool { if handle := c.handle; handle != nil { c.handle = nil c.closeChan <- true @@ -474,7 +474,10 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) { log.Println("Could not detach client", handle.Id, err) } } + return true } + + return false } func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) { @@ -726,6 +729,8 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st m.publishers[id+"|"+streamType] = client m.publisherCreated.Notify(id + "|" + streamType) m.mu.Unlock() + statsPublishersCurrent.WithLabelValues(streamType).Inc() + statsPublishersTotal.WithLabelValues(streamType).Inc() return client, nil } @@ -809,12 +814,14 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) { p.mu.Unlock() if notify { + statsPublishersCurrent.WithLabelValues(p.streamType).Dec() p.mcu.unregisterClient(p) p.listener.PublisherClosed(p) } } func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() jsep_msg := data.Payload switch data.Type { case "offer": @@ -925,6 +932,8 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ client.mcuJanusClient.handleSlowLink = client.handleSlowLink m.registerClient(client) go client.run(handle, client.closeChan) + statsSubscribersCurrent.WithLabelValues(streamType).Inc() + statsSubscribersTotal.WithLabelValues(streamType).Inc() return client, nil } @@ -992,9 +1001,12 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { func (p *mcuJanusSubscriber) Close(ctx context.Context) { p.mu.Lock() - p.closeClient(ctx) + closed := p.closeClient(ctx) p.mu.Unlock() + if closed { + statsSubscribersCurrent.WithLabelValues(p.streamType).Dec() + } p.mcu.unregisterClient(p) p.listener.SubscriberClosed(p) } @@ -1009,6 +1021,7 @@ func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error, waiter := p.mcu.publisherConnected.NewWaiter(p.publisher + "|" + p.streamType) defer p.mcu.publisherConnected.Release(waiter) + loggedNotPublishingYet := false retry: join_msg := map[string]interface{}{ "request": "join", @@ -1063,6 +1076,11 @@ retry: log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) } + if !loggedNotPublishingYet { + loggedNotPublishingYet = true + statsWaitingForPublisherTotal.WithLabelValues(p.streamType).Inc() + } + if err := waiter.Wait(ctx); err != nil { callback(err, nil) return @@ -1082,6 +1100,7 @@ retry: } func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() jsep_msg := data.Payload switch data.Type { case "requestoffer": diff --git a/mcu_proxy.go b/mcu_proxy.go index 4aa09cf..beb91a7 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -278,6 +278,7 @@ type mcuProxyConnection struct { reconnectTimer *time.Timer shutdownScheduled uint32 closeScheduled uint32 + trackClose uint32 helloMsgId string sessionId string @@ -500,6 +501,9 @@ func (c *mcuProxyConnection) close() { if c.conn != nil { c.conn.Close() c.conn = nil + if atomic.CompareAndSwapUint32(&c.trackClose, 1, 0) { + statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Dec() + } } } @@ -538,8 +542,8 @@ func (c *mcuProxyConnection) closeIfEmpty() bool { func (c *mcuProxyConnection) scheduleReconnect() { if err := c.sendClose(); err != nil && err != ErrNotConnected { log.Printf("Could not send close message to %s: %s", c.url, err) - c.close() } + c.close() interval := atomic.LoadInt64(&c.reconnectInterval) c.reconnectTimer.Reset(time.Duration(interval)) @@ -600,7 +604,10 @@ func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) { c.publishersLock.Lock() defer c.publishersLock.Unlock() - delete(c.publishers, publisher.proxyId) + if _, found := c.publishers[publisher.proxyId]; found { + delete(c.publishers, publisher.proxyId) + statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec() + } delete(c.publisherIds, publisher.id+"|"+publisher.StreamType()) if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { @@ -629,7 +636,10 @@ func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) { c.subscribersLock.Lock() defer c.subscribersLock.Unlock() - delete(c.subscribers, subscriber.proxyId) + if _, found := c.subscribers[subscriber.proxyId]; found { + delete(c.subscribers, subscriber.proxyId) + statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec() + } if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { go c.closeIfEmpty() @@ -708,6 +718,9 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) { } else { log.Printf("Received session %s from %s", c.sessionId, c.url) } + if atomic.CompareAndSwapUint32(&c.trackClose, 0, 1) { + statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc() + } default: log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url) c.scheduleReconnect() @@ -775,6 +788,7 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { log.Printf("Load of %s now at %d", c.url, event.Load) } atomic.StoreInt64(&c.load, event.Load) + statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load)) return case "shutdown-scheduled": log.Printf("Proxy %s is scheduled to shutdown", c.url) @@ -926,6 +940,8 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe c.publishers[proxyId] = publisher c.publisherIds[id+"|"+streamType] = proxyId c.publishersLock.Unlock() + statsPublishersCurrent.WithLabelValues(streamType).Inc() + statsPublishersTotal.WithLabelValues(streamType).Inc() return publisher, nil } @@ -958,6 +974,8 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber c.subscribersLock.Unlock() + statsSubscribersCurrent.WithLabelValues(streamType).Inc() + statsSubscribersTotal.WithLabelValues(streamType).Inc() return subscriber, nil } @@ -1692,6 +1710,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st return publisher, nil } + statsProxyNobackendAvailableTotal.WithLabelValues(streamType).Inc() return nil, fmt.Errorf("No MCU connection available") } @@ -1709,6 +1728,7 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, conn = m.publishers[publisher+"|"+streamType] if conn != nil { + // Publisher was created while waiting for lock. return conn } @@ -1716,6 +1736,7 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, id := m.addWaiter(ch) defer m.removeWaiter(id) + statsWaitingForPublisherTotal.WithLabelValues(streamType).Inc() for { m.mu.Unlock() select { diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go new file mode 100644 index 0000000..fc76229 --- /dev/null +++ b/mcu_stats_prometheus.go @@ -0,0 +1,117 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + statsPublishersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "publishers", + Help: "The current number of publishers", + }, []string{"type"}) + statsPublishersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "publishers_total", + Help: "The total number of created publishers", + }, []string{"type"}) + statsSubscribersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "subscribers", + Help: "The current number of subscribers", + }, []string{"type"}) + statsSubscribersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "subscribers_total", + Help: "The total number of created subscribers", + }, []string{"type"}) + statsWaitingForPublisherTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "nopublisher_total", + Help: "The total number of subscribe requests where no publisher exists", + }, []string{"type"}) + statsMcuMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "messages_total", + Help: "The total number of MCU messages", + }, []string{"type"}) + + commonMcuStats = []prometheus.Collector{ + statsPublishersCurrent, + statsPublishersTotal, + statsSubscribersCurrent, + statsSubscribersTotal, + statsWaitingForPublisherTotal, + statsMcuMessagesTotal, + } + + statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "backend_connections", + Help: "Current number of connections to signaling proxy backends", + }, []string{"country"}) + statsProxyBackendLoadCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "backend_load", + Help: "Current load of signaling proxy backends", + }, []string{"url"}) + statsProxyNobackendAvailableTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "no_backend_available_total", + Help: "Total number of publishing requests where no backend was available", + }, []string{"type"}) + + proxyMcuStats = []prometheus.Collector{ + statsConnectedProxyBackendsCurrent, + statsProxyBackendLoadCurrent, + statsProxyNobackendAvailableTotal, + } +) + +func RegisterJanusMcuStats() { + registerAll(commonMcuStats...) +} + +func UnregisterJanusMcuStats() { + unregisterAll(commonMcuStats...) +} + +func RegisterProxyMcuStats() { + registerAll(commonMcuStats...) + registerAll(proxyMcuStats...) +} + +func UnregisterProxyMcuStats() { + unregisterAll(commonMcuStats...) + unregisterAll(proxyMcuStats...) +} diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 58e2030..91e9cdb 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -43,6 +43,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/securecookie" "github.com/gorilla/websocket" + "github.com/prometheus/client_golang/prometheus/promhttp" "gopkg.in/dgrijalva/jwt-go.v3" @@ -205,6 +206,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (* r.HandleFunc("/proxy", result.setCommonHeaders(result.proxyHandler)).Methods("GET") r.HandleFunc("/stats", result.setCommonHeaders(result.validateStatsRequest(result.statsHandler))).Methods("GET") + r.HandleFunc("/metrics", result.setCommonHeaders(result.validateStatsRequest(result.metricsHandler))).Methods("GET") return result, nil } @@ -236,6 +238,9 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { switch mcuType { case signaling.McuTypeJanus: mcu, err = signaling.NewMcuJanus(s.url, config) + if err == nil { + signaling.RegisterJanusMcuStats() + } default: return fmt.Errorf("Unsupported MCU type: %s", mcuType) } @@ -555,6 +560,7 @@ func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) { } else { s.sendCurrentLoad(session) } + statsSessionsResumedTotal.Inc() } else { var err error if session, err = s.NewSession(message.Hello); err != nil { @@ -619,6 +625,9 @@ func (i *emptyInitiator) Country() string { func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) { cmd := message.Command + + statsCommandMessagesTotal.WithLabelValues(cmd.Type).Inc() + switch cmd.Type { case "create-publisher": if atomic.LoadUint32(&s.shutdownScheduled) != 0 { @@ -650,6 +659,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s }, } session.sendMessage(response) + statsPublishersCurrent.WithLabelValues(cmd.StreamType).Inc() + statsPublishersTotal.WithLabelValues(cmd.StreamType).Inc() case "create-subscriber": id := uuid.New().String() publisherId := cmd.PublisherId @@ -676,6 +687,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s }, } session.sendMessage(response) + statsSubscribersCurrent.WithLabelValues(cmd.StreamType).Inc() + statsSubscribersTotal.WithLabelValues(cmd.StreamType).Inc() case "delete-publisher": client := s.GetClient(cmd.ClientId) if client == nil { @@ -688,7 +701,9 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s return } - s.DeleteClient(cmd.ClientId, client) + if s.DeleteClient(cmd.ClientId, client) { + statsPublishersCurrent.WithLabelValues(client.StreamType()).Dec() + } go func() { log.Printf("Closing %s publisher %s as %s", client.StreamType(), client.Id(), cmd.ClientId) @@ -721,7 +736,9 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s return } - s.DeleteClient(cmd.ClientId, client) + if s.DeleteClient(cmd.ClientId, client) { + statsSubscribersCurrent.WithLabelValues(client.StreamType()).Dec() + } go func() { log.Printf("Closing %s subscriber %s as %s", client.StreamType(), client.Id(), cmd.ClientId) @@ -750,6 +767,8 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s return } + statsPayloadMessagesTotal.WithLabelValues(payload.Type).Inc() + var mcuData *signaling.MessageClientMessageData switch payload.Type { case "offer": @@ -811,42 +830,50 @@ func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*Pro log.Printf("Hello: %+v", hello) } + reason := "auth-failed" token, err := jwt.ParseWithClaims(hello.Token, &signaling.TokenClaims{}, func(token *jwt.Token) (interface{}, error) { // Don't forget to validate the alg is what you expect: if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok { log.Printf("Unexpected signing method: %v", token.Header["alg"]) + reason = "unsupported-signing-method" return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"]) } claims, ok := token.Claims.(*signaling.TokenClaims) if !ok { log.Printf("Unsupported claims type: %+v", token.Claims) + reason = "unsupported-claims" return nil, fmt.Errorf("Unsupported claims type") } tokenKey, err := s.tokens.Get(claims.Issuer) if err != nil { log.Printf("Could not get token for %s: %s", claims.Issuer, err) + reason = "missing-issuer" return nil, err } if tokenKey == nil || tokenKey.key == nil { log.Printf("Issuer %s is not supported", claims.Issuer) + reason = "unsupported-issuer" return nil, fmt.Errorf("No key found for issuer") } return tokenKey.key, nil }) if err != nil { + statsTokenErrorsTotal.WithLabelValues(reason).Inc() return nil, TokenAuthFailed } claims, ok := token.Claims.(*signaling.TokenClaims) if !ok || !token.Valid { + statsTokenErrorsTotal.WithLabelValues("auth-failed").Inc() return nil, TokenAuthFailed } minIssuedAt := time.Now().Add(-maxTokenAge) if issuedAt := time.Unix(claims.IssuedAt, 0); issuedAt.Before(minIssuedAt) { + statsTokenErrorsTotal.WithLabelValues("expired").Inc() return nil, TokenExpired } @@ -868,6 +895,8 @@ func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*Pro log.Printf("Created session %s for %+v", encoded, claims) session := NewProxySession(s, sid, encoded) s.StoreSession(sid, session) + statsSessionsCurrent.Inc() + statsSessionsTotal.Inc() return session, nil } @@ -906,6 +935,7 @@ func (s *ProxyServer) DeleteSession(id uint64) { func (s *ProxyServer) deleteSessionLocked(id uint64) { delete(s.sessions, id) + statsSessionsCurrent.Dec() } func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) { @@ -915,9 +945,13 @@ func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) { s.clientIds[client.Id()] = id } -func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) { +func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool { s.clientsLock.Lock() defer s.clientsLock.Unlock() + if _, found := s.clients[id]; !found { + return false + } + delete(s.clients, id) delete(s.clientIds, client.Id()) @@ -926,6 +960,7 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) { s.shutdownChannel <- true }() } + return true } func (s *ProxyServer) GetClientCount() int64 { @@ -986,3 +1021,8 @@ func (s *ProxyServer) statsHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(statsData) // nolint } + +func (s *ProxyServer) metricsHandler(w http.ResponseWriter, r *http.Request) { + // Expose prometheus metrics at "/metrics". + promhttp.Handler().ServeHTTP(w, r) +} diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 2a6b0c6..98f1f19 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -28,7 +28,7 @@ import ( "sync/atomic" "time" - "github.com/strukturag/nextcloud-spreed-signaling" + signaling "github.com/strukturag/nextcloud-spreed-signaling" ) const ( @@ -169,7 +169,9 @@ func (s *ProxySession) OnIceCompleted(client signaling.McuClient) { func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { if id := s.DeletePublisher(publisher); id != "" { - s.proxy.DeleteClient(id, publisher) + if s.proxy.DeleteClient(id, publisher) { + statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec() + } msg := &signaling.ProxyServerMessage{ Type: "event", @@ -184,7 +186,9 @@ func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) { if id := s.DeleteSubscriber(subscriber); id != "" { - s.proxy.DeleteClient(id, subscriber) + if s.proxy.DeleteClient(id, subscriber) { + statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec() + } msg := &signaling.ProxyServerMessage{ Type: "event", diff --git a/proxy/proxy_stats_prometheus.go b/proxy/proxy_stats_prometheus.go new file mode 100644 index 0000000..054ec2b --- /dev/null +++ b/proxy/proxy_stats_prometheus.go @@ -0,0 +1,102 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + statsSessionsCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "sessions", + Help: "The current number of sessions", + }) + statsSessionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "sessions_total", + Help: "The total number of created sessions", + }) + statsSessionsResumedTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "sessions_resumed_total", + Help: "The total number of resumed sessions", + }) + statsPublishersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "publishers", + Help: "The current number of publishers", + }, []string{"type"}) + statsPublishersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "publishers_total", + Help: "The total number of created publishers", + }, []string{"type"}) + statsSubscribersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "subscribers", + Help: "The current number of subscribers", + }, []string{"type"}) + statsSubscribersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "subscribers_total", + Help: "The total number of created subscribers", + }, []string{"type"}) + statsCommandMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "command_messages_total", + Help: "The total number of command messages", + }, []string{"type"}) + statsPayloadMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "payload_messages_total", + Help: "The total number of payload messages", + }, []string{"type"}) + statsTokenErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "proxy", + Name: "token_errors_total", + Help: "The total number of token errors", + }, []string{"reason"}) +) + +func init() { + prometheus.MustRegister(statsSessionsCurrent) + prometheus.MustRegister(statsSessionsTotal) + prometheus.MustRegister(statsSessionsResumedTotal) + prometheus.MustRegister(statsPublishersCurrent) + prometheus.MustRegister(statsPublishersTotal) + prometheus.MustRegister(statsSubscribersCurrent) + prometheus.MustRegister(statsSubscribersTotal) + prometheus.MustRegister(statsCommandMessagesTotal) + prometheus.MustRegister(statsPayloadMessagesTotal) + prometheus.MustRegister(statsTokenErrorsTotal) +} diff --git a/server/main.go b/server/main.go index 5568048..1b1b52c 100644 --- a/server/main.go +++ b/server/main.go @@ -141,6 +141,8 @@ func main() { runtime.GOMAXPROCS(cpus) log.Printf("Using a maximum of %d CPUs", cpus) + signaling.RegisterStats() + natsUrl, _ := config.GetString("nats", "url") if natsUrl == "" { natsUrl = nats.DefaultURL @@ -176,8 +178,12 @@ func main() { switch mcuType { case signaling.McuTypeJanus: mcu, err = signaling.NewMcuJanus(mcuUrl, config) + signaling.UnregisterProxyMcuStats() + signaling.RegisterJanusMcuStats() case signaling.McuTypeProxy: mcu, err = signaling.NewMcuProxy(config) + signaling.UnregisterJanusMcuStats() + signaling.RegisterProxyMcuStats() default: log.Fatal("Unsupported MCU type: ", mcuType) } diff --git a/stats_prometheus.go b/stats_prometheus.go new file mode 100644 index 0000000..93af6af --- /dev/null +++ b/stats_prometheus.go @@ -0,0 +1,59 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + statsMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "server", + Name: "messages_total", + Help: "The total number of signaling messages", + }, []string{"type"}) + + signalingStats = []prometheus.Collector{ + statsMessagesTotal, + } +) + +func registerAll(cs ...prometheus.Collector) { + for _, c := range cs { + if err := prometheus.DefaultRegisterer.Register(c); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + panic(err) + } + } + } +} + +func unregisterAll(cs ...prometheus.Collector) { + for _, c := range cs { + prometheus.Unregister(c) + } +} + +func RegisterStats() { + registerAll(signalingStats...) +}