Add prometheus metrics.

This commit is contained in:
Joachim Bauch 2021-04-20 17:12:28 +02:00
parent 300edddc5a
commit c01caa94e6
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
9 changed files with 387 additions and 11 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/dlintw/goconf" "github.com/dlintw/goconf"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
const ( const (
@ -153,6 +154,9 @@ func (b *BackendServer) Start(r *mux.Router) error {
s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.parseRequestBody(b.roomHandler))).Methods("POST") s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.parseRequestBody(b.roomHandler))).Methods("POST")
s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET") s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")
// Expose prometheus metrics at "/metrics".
r.HandleFunc("/metrics", b.setComonHeaders(b.validateStatsRequest(b.metricsHandler))).Methods("GET")
// Provide a REST service to get TURN credentials. // Provide a REST service to get TURN credentials.
// See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 // See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
r.HandleFunc("/turn/credentials", b.setComonHeaders(b.getTurnCredentials)).Methods("GET") r.HandleFunc("/turn/credentials", b.setComonHeaders(b.getTurnCredentials)).Methods("GET")
@ -632,3 +636,7 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(statsData) // nolint w.Write(statsData) // nolint
} }
func (b *BackendServer) metricsHandler(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}

View File

@ -465,7 +465,7 @@ func (c *mcuJanusClient) Close(ctx context.Context) {
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
} }
func (c *mcuJanusClient) closeClient(ctx context.Context) { func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
if handle := c.handle; handle != nil { if handle := c.handle; handle != nil {
c.handle = nil c.handle = nil
c.closeChan <- true c.closeChan <- true
@ -474,7 +474,10 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) {
log.Println("Could not detach client", handle.Id, err) log.Println("Could not detach client", handle.Id, err)
} }
} }
return true
} }
return false
} }
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) { func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) {
@ -726,6 +729,8 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
m.publishers[id+"|"+streamType] = client m.publishers[id+"|"+streamType] = client
m.publisherCreated.Notify(id + "|" + streamType) m.publisherCreated.Notify(id + "|" + streamType)
m.mu.Unlock() m.mu.Unlock()
statsPublishersCurrent.WithLabelValues(streamType).Inc()
statsPublishersTotal.WithLabelValues(streamType).Inc()
return client, nil return client, nil
} }
@ -809,12 +814,14 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) {
p.mu.Unlock() p.mu.Unlock()
if notify { if notify {
statsPublishersCurrent.WithLabelValues(p.streamType).Dec()
p.mcu.unregisterClient(p) p.mcu.unregisterClient(p)
p.listener.PublisherClosed(p) p.listener.PublisherClosed(p)
} }
} }
func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
jsep_msg := data.Payload jsep_msg := data.Payload
switch data.Type { switch data.Type {
case "offer": case "offer":
@ -925,6 +932,8 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
client.mcuJanusClient.handleSlowLink = client.handleSlowLink client.mcuJanusClient.handleSlowLink = client.handleSlowLink
m.registerClient(client) m.registerClient(client)
go client.run(handle, client.closeChan) go client.run(handle, client.closeChan)
statsSubscribersCurrent.WithLabelValues(streamType).Inc()
statsSubscribersTotal.WithLabelValues(streamType).Inc()
return client, nil return client, nil
} }
@ -992,9 +1001,12 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
func (p *mcuJanusSubscriber) Close(ctx context.Context) { func (p *mcuJanusSubscriber) Close(ctx context.Context) {
p.mu.Lock() p.mu.Lock()
p.closeClient(ctx) closed := p.closeClient(ctx)
p.mu.Unlock() p.mu.Unlock()
if closed {
statsSubscribersCurrent.WithLabelValues(p.streamType).Dec()
}
p.mcu.unregisterClient(p) p.mcu.unregisterClient(p)
p.listener.SubscriberClosed(p) p.listener.SubscriberClosed(p)
} }
@ -1009,6 +1021,7 @@ func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error,
waiter := p.mcu.publisherConnected.NewWaiter(p.publisher + "|" + p.streamType) waiter := p.mcu.publisherConnected.NewWaiter(p.publisher + "|" + p.streamType)
defer p.mcu.publisherConnected.Release(waiter) defer p.mcu.publisherConnected.Release(waiter)
loggedNotPublishingYet := false
retry: retry:
join_msg := map[string]interface{}{ join_msg := map[string]interface{}{
"request": "join", "request": "join",
@ -1063,6 +1076,11 @@ retry:
log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
} }
if !loggedNotPublishingYet {
loggedNotPublishingYet = true
statsWaitingForPublisherTotal.WithLabelValues(p.streamType).Inc()
}
if err := waiter.Wait(ctx); err != nil { if err := waiter.Wait(ctx); err != nil {
callback(err, nil) callback(err, nil)
return return
@ -1082,6 +1100,7 @@ retry:
} }
func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
jsep_msg := data.Payload jsep_msg := data.Payload
switch data.Type { switch data.Type {
case "requestoffer": case "requestoffer":

View File

@ -278,6 +278,7 @@ type mcuProxyConnection struct {
reconnectTimer *time.Timer reconnectTimer *time.Timer
shutdownScheduled uint32 shutdownScheduled uint32
closeScheduled uint32 closeScheduled uint32
trackClose uint32
helloMsgId string helloMsgId string
sessionId string sessionId string
@ -500,6 +501,9 @@ func (c *mcuProxyConnection) close() {
if c.conn != nil { if c.conn != nil {
c.conn.Close() c.conn.Close()
c.conn = nil c.conn = nil
if atomic.CompareAndSwapUint32(&c.trackClose, 1, 0) {
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Dec()
}
} }
} }
@ -538,8 +542,8 @@ func (c *mcuProxyConnection) closeIfEmpty() bool {
func (c *mcuProxyConnection) scheduleReconnect() { func (c *mcuProxyConnection) scheduleReconnect() {
if err := c.sendClose(); err != nil && err != ErrNotConnected { if err := c.sendClose(); err != nil && err != ErrNotConnected {
log.Printf("Could not send close message to %s: %s", c.url, err) log.Printf("Could not send close message to %s: %s", c.url, err)
c.close()
} }
c.close()
interval := atomic.LoadInt64(&c.reconnectInterval) interval := atomic.LoadInt64(&c.reconnectInterval)
c.reconnectTimer.Reset(time.Duration(interval)) c.reconnectTimer.Reset(time.Duration(interval))
@ -600,7 +604,10 @@ func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) {
c.publishersLock.Lock() c.publishersLock.Lock()
defer c.publishersLock.Unlock() defer c.publishersLock.Unlock()
delete(c.publishers, publisher.proxyId) if _, found := c.publishers[publisher.proxyId]; found {
delete(c.publishers, publisher.proxyId)
statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec()
}
delete(c.publisherIds, publisher.id+"|"+publisher.StreamType()) delete(c.publisherIds, publisher.id+"|"+publisher.StreamType())
if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
@ -629,7 +636,10 @@ func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) {
c.subscribersLock.Lock() c.subscribersLock.Lock()
defer c.subscribersLock.Unlock() defer c.subscribersLock.Unlock()
delete(c.subscribers, subscriber.proxyId) if _, found := c.subscribers[subscriber.proxyId]; found {
delete(c.subscribers, subscriber.proxyId)
statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec()
}
if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
go c.closeIfEmpty() go c.closeIfEmpty()
@ -708,6 +718,9 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
} else { } else {
log.Printf("Received session %s from %s", c.sessionId, c.url) log.Printf("Received session %s from %s", c.sessionId, c.url)
} }
if atomic.CompareAndSwapUint32(&c.trackClose, 0, 1) {
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc()
}
default: default:
log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url) log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url)
c.scheduleReconnect() c.scheduleReconnect()
@ -775,6 +788,7 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
log.Printf("Load of %s now at %d", c.url, event.Load) log.Printf("Load of %s now at %d", c.url, event.Load)
} }
atomic.StoreInt64(&c.load, event.Load) atomic.StoreInt64(&c.load, event.Load)
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
return return
case "shutdown-scheduled": case "shutdown-scheduled":
log.Printf("Proxy %s is scheduled to shutdown", c.url) log.Printf("Proxy %s is scheduled to shutdown", c.url)
@ -926,6 +940,8 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
c.publishers[proxyId] = publisher c.publishers[proxyId] = publisher
c.publisherIds[id+"|"+streamType] = proxyId c.publisherIds[id+"|"+streamType] = proxyId
c.publishersLock.Unlock() c.publishersLock.Unlock()
statsPublishersCurrent.WithLabelValues(streamType).Inc()
statsPublishersTotal.WithLabelValues(streamType).Inc()
return publisher, nil return publisher, nil
} }
@ -958,6 +974,8 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList
c.subscribersLock.Lock() c.subscribersLock.Lock()
c.subscribers[proxyId] = subscriber c.subscribers[proxyId] = subscriber
c.subscribersLock.Unlock() c.subscribersLock.Unlock()
statsSubscribersCurrent.WithLabelValues(streamType).Inc()
statsSubscribersTotal.WithLabelValues(streamType).Inc()
return subscriber, nil return subscriber, nil
} }
@ -1692,6 +1710,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
return publisher, nil return publisher, nil
} }
statsProxyNobackendAvailableTotal.WithLabelValues(streamType).Inc()
return nil, fmt.Errorf("No MCU connection available") return nil, fmt.Errorf("No MCU connection available")
} }
@ -1709,6 +1728,7 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string,
conn = m.publishers[publisher+"|"+streamType] conn = m.publishers[publisher+"|"+streamType]
if conn != nil { if conn != nil {
// Publisher was created while waiting for lock.
return conn return conn
} }
@ -1716,6 +1736,7 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string,
id := m.addWaiter(ch) id := m.addWaiter(ch)
defer m.removeWaiter(id) defer m.removeWaiter(id)
statsWaitingForPublisherTotal.WithLabelValues(streamType).Inc()
for { for {
m.mu.Unlock() m.mu.Unlock()
select { select {

117
mcu_stats_prometheus.go Normal file
View File

@ -0,0 +1,117 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
statsPublishersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "publishers",
Help: "The current number of publishers",
}, []string{"type"})
statsPublishersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "publishers_total",
Help: "The total number of created publishers",
}, []string{"type"})
statsSubscribersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "subscribers",
Help: "The current number of subscribers",
}, []string{"type"})
statsSubscribersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "subscribers_total",
Help: "The total number of created subscribers",
}, []string{"type"})
statsWaitingForPublisherTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "nopublisher_total",
Help: "The total number of subscribe requests where no publisher exists",
}, []string{"type"})
statsMcuMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "messages_total",
Help: "The total number of MCU messages",
}, []string{"type"})
commonMcuStats = []prometheus.Collector{
statsPublishersCurrent,
statsPublishersTotal,
statsSubscribersCurrent,
statsSubscribersTotal,
statsWaitingForPublisherTotal,
statsMcuMessagesTotal,
}
statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "backend_connections",
Help: "Current number of connections to signaling proxy backends",
}, []string{"country"})
statsProxyBackendLoadCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "backend_load",
Help: "Current load of signaling proxy backends",
}, []string{"url"})
statsProxyNobackendAvailableTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "no_backend_available_total",
Help: "Total number of publishing requests where no backend was available",
}, []string{"type"})
proxyMcuStats = []prometheus.Collector{
statsConnectedProxyBackendsCurrent,
statsProxyBackendLoadCurrent,
statsProxyNobackendAvailableTotal,
}
)
func RegisterJanusMcuStats() {
registerAll(commonMcuStats...)
}
func UnregisterJanusMcuStats() {
unregisterAll(commonMcuStats...)
}
func RegisterProxyMcuStats() {
registerAll(commonMcuStats...)
registerAll(proxyMcuStats...)
}
func UnregisterProxyMcuStats() {
unregisterAll(commonMcuStats...)
unregisterAll(proxyMcuStats...)
}

View File

@ -43,6 +43,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/dgrijalva/jwt-go.v3" "gopkg.in/dgrijalva/jwt-go.v3"
@ -205,6 +206,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
r.HandleFunc("/proxy", result.setCommonHeaders(result.proxyHandler)).Methods("GET") r.HandleFunc("/proxy", result.setCommonHeaders(result.proxyHandler)).Methods("GET")
r.HandleFunc("/stats", result.setCommonHeaders(result.validateStatsRequest(result.statsHandler))).Methods("GET") r.HandleFunc("/stats", result.setCommonHeaders(result.validateStatsRequest(result.statsHandler))).Methods("GET")
r.HandleFunc("/metrics", result.setCommonHeaders(result.validateStatsRequest(result.metricsHandler))).Methods("GET")
return result, nil return result, nil
} }
@ -236,6 +238,9 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
switch mcuType { switch mcuType {
case signaling.McuTypeJanus: case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(s.url, config) mcu, err = signaling.NewMcuJanus(s.url, config)
if err == nil {
signaling.RegisterJanusMcuStats()
}
default: default:
return fmt.Errorf("Unsupported MCU type: %s", mcuType) return fmt.Errorf("Unsupported MCU type: %s", mcuType)
} }
@ -555,6 +560,7 @@ func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) {
} else { } else {
s.sendCurrentLoad(session) s.sendCurrentLoad(session)
} }
statsSessionsResumedTotal.Inc()
} else { } else {
var err error var err error
if session, err = s.NewSession(message.Hello); err != nil { if session, err = s.NewSession(message.Hello); err != nil {
@ -619,6 +625,9 @@ func (i *emptyInitiator) Country() string {
func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) { func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) {
cmd := message.Command cmd := message.Command
statsCommandMessagesTotal.WithLabelValues(cmd.Type).Inc()
switch cmd.Type { switch cmd.Type {
case "create-publisher": case "create-publisher":
if atomic.LoadUint32(&s.shutdownScheduled) != 0 { if atomic.LoadUint32(&s.shutdownScheduled) != 0 {
@ -650,6 +659,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
}, },
} }
session.sendMessage(response) session.sendMessage(response)
statsPublishersCurrent.WithLabelValues(cmd.StreamType).Inc()
statsPublishersTotal.WithLabelValues(cmd.StreamType).Inc()
case "create-subscriber": case "create-subscriber":
id := uuid.New().String() id := uuid.New().String()
publisherId := cmd.PublisherId publisherId := cmd.PublisherId
@ -676,6 +687,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
}, },
} }
session.sendMessage(response) session.sendMessage(response)
statsSubscribersCurrent.WithLabelValues(cmd.StreamType).Inc()
statsSubscribersTotal.WithLabelValues(cmd.StreamType).Inc()
case "delete-publisher": case "delete-publisher":
client := s.GetClient(cmd.ClientId) client := s.GetClient(cmd.ClientId)
if client == nil { if client == nil {
@ -688,7 +701,9 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
return return
} }
s.DeleteClient(cmd.ClientId, client) if s.DeleteClient(cmd.ClientId, client) {
statsPublishersCurrent.WithLabelValues(client.StreamType()).Dec()
}
go func() { go func() {
log.Printf("Closing %s publisher %s as %s", client.StreamType(), client.Id(), cmd.ClientId) log.Printf("Closing %s publisher %s as %s", client.StreamType(), client.Id(), cmd.ClientId)
@ -721,7 +736,9 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
return return
} }
s.DeleteClient(cmd.ClientId, client) if s.DeleteClient(cmd.ClientId, client) {
statsSubscribersCurrent.WithLabelValues(client.StreamType()).Dec()
}
go func() { go func() {
log.Printf("Closing %s subscriber %s as %s", client.StreamType(), client.Id(), cmd.ClientId) log.Printf("Closing %s subscriber %s as %s", client.StreamType(), client.Id(), cmd.ClientId)
@ -750,6 +767,8 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
return return
} }
statsPayloadMessagesTotal.WithLabelValues(payload.Type).Inc()
var mcuData *signaling.MessageClientMessageData var mcuData *signaling.MessageClientMessageData
switch payload.Type { switch payload.Type {
case "offer": case "offer":
@ -811,42 +830,50 @@ func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*Pro
log.Printf("Hello: %+v", hello) log.Printf("Hello: %+v", hello)
} }
reason := "auth-failed"
token, err := jwt.ParseWithClaims(hello.Token, &signaling.TokenClaims{}, func(token *jwt.Token) (interface{}, error) { token, err := jwt.ParseWithClaims(hello.Token, &signaling.TokenClaims{}, func(token *jwt.Token) (interface{}, error) {
// Don't forget to validate the alg is what you expect: // Don't forget to validate the alg is what you expect:
if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok { if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
log.Printf("Unexpected signing method: %v", token.Header["alg"]) log.Printf("Unexpected signing method: %v", token.Header["alg"])
reason = "unsupported-signing-method"
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"]) return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
} }
claims, ok := token.Claims.(*signaling.TokenClaims) claims, ok := token.Claims.(*signaling.TokenClaims)
if !ok { if !ok {
log.Printf("Unsupported claims type: %+v", token.Claims) log.Printf("Unsupported claims type: %+v", token.Claims)
reason = "unsupported-claims"
return nil, fmt.Errorf("Unsupported claims type") return nil, fmt.Errorf("Unsupported claims type")
} }
tokenKey, err := s.tokens.Get(claims.Issuer) tokenKey, err := s.tokens.Get(claims.Issuer)
if err != nil { if err != nil {
log.Printf("Could not get token for %s: %s", claims.Issuer, err) log.Printf("Could not get token for %s: %s", claims.Issuer, err)
reason = "missing-issuer"
return nil, err return nil, err
} }
if tokenKey == nil || tokenKey.key == nil { if tokenKey == nil || tokenKey.key == nil {
log.Printf("Issuer %s is not supported", claims.Issuer) log.Printf("Issuer %s is not supported", claims.Issuer)
reason = "unsupported-issuer"
return nil, fmt.Errorf("No key found for issuer") return nil, fmt.Errorf("No key found for issuer")
} }
return tokenKey.key, nil return tokenKey.key, nil
}) })
if err != nil { if err != nil {
statsTokenErrorsTotal.WithLabelValues(reason).Inc()
return nil, TokenAuthFailed return nil, TokenAuthFailed
} }
claims, ok := token.Claims.(*signaling.TokenClaims) claims, ok := token.Claims.(*signaling.TokenClaims)
if !ok || !token.Valid { if !ok || !token.Valid {
statsTokenErrorsTotal.WithLabelValues("auth-failed").Inc()
return nil, TokenAuthFailed return nil, TokenAuthFailed
} }
minIssuedAt := time.Now().Add(-maxTokenAge) minIssuedAt := time.Now().Add(-maxTokenAge)
if issuedAt := time.Unix(claims.IssuedAt, 0); issuedAt.Before(minIssuedAt) { if issuedAt := time.Unix(claims.IssuedAt, 0); issuedAt.Before(minIssuedAt) {
statsTokenErrorsTotal.WithLabelValues("expired").Inc()
return nil, TokenExpired return nil, TokenExpired
} }
@ -868,6 +895,8 @@ func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*Pro
log.Printf("Created session %s for %+v", encoded, claims) log.Printf("Created session %s for %+v", encoded, claims)
session := NewProxySession(s, sid, encoded) session := NewProxySession(s, sid, encoded)
s.StoreSession(sid, session) s.StoreSession(sid, session)
statsSessionsCurrent.Inc()
statsSessionsTotal.Inc()
return session, nil return session, nil
} }
@ -906,6 +935,7 @@ func (s *ProxyServer) DeleteSession(id uint64) {
func (s *ProxyServer) deleteSessionLocked(id uint64) { func (s *ProxyServer) deleteSessionLocked(id uint64) {
delete(s.sessions, id) delete(s.sessions, id)
statsSessionsCurrent.Dec()
} }
func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) { func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) {
@ -915,9 +945,13 @@ func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) {
s.clientIds[client.Id()] = id s.clientIds[client.Id()] = id
} }
func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) { func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool {
s.clientsLock.Lock() s.clientsLock.Lock()
defer s.clientsLock.Unlock() defer s.clientsLock.Unlock()
if _, found := s.clients[id]; !found {
return false
}
delete(s.clients, id) delete(s.clients, id)
delete(s.clientIds, client.Id()) delete(s.clientIds, client.Id())
@ -926,6 +960,7 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) {
s.shutdownChannel <- true s.shutdownChannel <- true
}() }()
} }
return true
} }
func (s *ProxyServer) GetClientCount() int64 { func (s *ProxyServer) GetClientCount() int64 {
@ -986,3 +1021,8 @@ func (s *ProxyServer) statsHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(statsData) // nolint w.Write(statsData) // nolint
} }
func (s *ProxyServer) metricsHandler(w http.ResponseWriter, r *http.Request) {
// Expose prometheus metrics at "/metrics".
promhttp.Handler().ServeHTTP(w, r)
}

View File

@ -28,7 +28,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/strukturag/nextcloud-spreed-signaling" signaling "github.com/strukturag/nextcloud-spreed-signaling"
) )
const ( const (
@ -169,7 +169,9 @@ func (s *ProxySession) OnIceCompleted(client signaling.McuClient) {
func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
if id := s.DeletePublisher(publisher); id != "" { if id := s.DeletePublisher(publisher); id != "" {
s.proxy.DeleteClient(id, publisher) if s.proxy.DeleteClient(id, publisher) {
statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec()
}
msg := &signaling.ProxyServerMessage{ msg := &signaling.ProxyServerMessage{
Type: "event", Type: "event",
@ -184,7 +186,9 @@ func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) { func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) {
if id := s.DeleteSubscriber(subscriber); id != "" { if id := s.DeleteSubscriber(subscriber); id != "" {
s.proxy.DeleteClient(id, subscriber) if s.proxy.DeleteClient(id, subscriber) {
statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec()
}
msg := &signaling.ProxyServerMessage{ msg := &signaling.ProxyServerMessage{
Type: "event", Type: "event",

View File

@ -0,0 +1,102 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
statsSessionsCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions",
Help: "The current number of sessions",
})
statsSessionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions_total",
Help: "The total number of created sessions",
})
statsSessionsResumedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions_resumed_total",
Help: "The total number of resumed sessions",
})
statsPublishersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "publishers",
Help: "The current number of publishers",
}, []string{"type"})
statsPublishersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "publishers_total",
Help: "The total number of created publishers",
}, []string{"type"})
statsSubscribersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "subscribers",
Help: "The current number of subscribers",
}, []string{"type"})
statsSubscribersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "subscribers_total",
Help: "The total number of created subscribers",
}, []string{"type"})
statsCommandMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "command_messages_total",
Help: "The total number of command messages",
}, []string{"type"})
statsPayloadMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "payload_messages_total",
Help: "The total number of payload messages",
}, []string{"type"})
statsTokenErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "token_errors_total",
Help: "The total number of token errors",
}, []string{"reason"})
)
func init() {
prometheus.MustRegister(statsSessionsCurrent)
prometheus.MustRegister(statsSessionsTotal)
prometheus.MustRegister(statsSessionsResumedTotal)
prometheus.MustRegister(statsPublishersCurrent)
prometheus.MustRegister(statsPublishersTotal)
prometheus.MustRegister(statsSubscribersCurrent)
prometheus.MustRegister(statsSubscribersTotal)
prometheus.MustRegister(statsCommandMessagesTotal)
prometheus.MustRegister(statsPayloadMessagesTotal)
prometheus.MustRegister(statsTokenErrorsTotal)
}

View File

@ -141,6 +141,8 @@ func main() {
runtime.GOMAXPROCS(cpus) runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs", cpus) log.Printf("Using a maximum of %d CPUs", cpus)
signaling.RegisterStats()
natsUrl, _ := config.GetString("nats", "url") natsUrl, _ := config.GetString("nats", "url")
if natsUrl == "" { if natsUrl == "" {
natsUrl = nats.DefaultURL natsUrl = nats.DefaultURL
@ -176,8 +178,12 @@ func main() {
switch mcuType { switch mcuType {
case signaling.McuTypeJanus: case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(mcuUrl, config) mcu, err = signaling.NewMcuJanus(mcuUrl, config)
signaling.UnregisterProxyMcuStats()
signaling.RegisterJanusMcuStats()
case signaling.McuTypeProxy: case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(config) mcu, err = signaling.NewMcuProxy(config)
signaling.UnregisterJanusMcuStats()
signaling.RegisterProxyMcuStats()
default: default:
log.Fatal("Unsupported MCU type: ", mcuType) log.Fatal("Unsupported MCU type: ", mcuType)
} }

59
stats_prometheus.go Normal file
View File

@ -0,0 +1,59 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
statsMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "server",
Name: "messages_total",
Help: "The total number of signaling messages",
}, []string{"type"})
signalingStats = []prometheus.Collector{
statsMessagesTotal,
}
)
func registerAll(cs ...prometheus.Collector) {
for _, c := range cs {
if err := prometheus.DefaultRegisterer.Register(c); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
panic(err)
}
}
}
}
func unregisterAll(cs ...prometheus.Collector) {
for _, c := range cs {
prometheus.Unregister(c)
}
}
func RegisterStats() {
registerAll(signalingStats...)
}