From 6455e70f15eebd3ae11be0fe7622ce73d8e485b1 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 28 May 2020 16:02:04 +0200 Subject: [PATCH] Add basic stats API. Can be used to query number of sessions, rooms and (if Janus is configured), overall MCU clients and publishers. --- server.conf.in | 5 +++ src/signaling/backend_server.go | 58 +++++++++++++++++++++++++++++++++ src/signaling/hub.go | 16 +++++++++ src/signaling/mcu_common.go | 2 ++ src/signaling/mcu_janus.go | 28 ++++++++++++++++ src/signaling/mcu_test.go | 4 +++ 6 files changed, 113 insertions(+) diff --git a/server.conf.in b/server.conf.in index fe91c1e..a52090a 100644 --- a/server.conf.in +++ b/server.conf.in @@ -110,3 +110,8 @@ url = # information. # Leave empty to disable GeoIP lookups. #license = + +[stats] +# Comma-separated list of IP addresses that are allowed to access the stats +# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1". +#allowed_ips = diff --git a/src/signaling/backend_server.go b/src/signaling/backend_server.go index 3ad2d36..9179ecb 100644 --- a/src/signaling/backend_server.go +++ b/src/signaling/backend_server.go @@ -30,6 +30,7 @@ import ( "io" "io/ioutil" "log" + "net" "net/http" "reflect" "strings" @@ -49,6 +50,7 @@ const ( ) type BackendServer struct { + hub *Hub nats NatsClient roomSessions RoomSessions @@ -61,6 +63,8 @@ type BackendServer struct { turnsecret []byte turnvalid time.Duration turnservers []string + + statsAllowedIps map[string]bool } func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) { @@ -95,7 +99,26 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac } } + statsAllowed, _ := config.GetString("stats", "allowed_ips") + var statsAllowedIps map[string]bool + if statsAllowed == "" { + log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1") + statsAllowedIps = map[string]bool{ + "127.0.0.1": true, + } + } else { + log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed) + statsAllowedIps = make(map[string]bool) + for _, ip := range strings.Split(statsAllowed, ",") { + ip = strings.TrimSpace(ip) + if ip != "" { + statsAllowedIps[ip] = true + } + } + } + return &BackendServer{ + hub: hub, nats: hub.nats, roomSessions: hub.roomSessions, version: version, @@ -107,6 +130,8 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac turnsecret: []byte(turnsecret), turnvalid: turnvalid, turnservers: turnserverslist, + + statsAllowedIps: statsAllowedIps, }, nil } @@ -124,6 +149,7 @@ func (b *BackendServer) Start(r *mux.Router) error { s := r.PathPrefix("/api/v1").Subrouter() s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET") s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST") + s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET") // Provide a REST service to get TURN credentials. // See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 @@ -524,3 +550,35 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body // TODO(jojo): Return better response struct. w.Write([]byte("{}")) } + +func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + addr := getRealUserIP(r) + if strings.Contains(addr, ":") { + if host, _, err := net.SplitHostPort(addr); err == nil { + addr = host + } + } + if !b.statsAllowedIps[addr] { + http.Error(w, "Authentication check failed", http.StatusForbidden) + return + } + + f(w, r) + } +} + +func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) { + stats := b.hub.GetStats() + statsData, err := json.MarshalIndent(stats, "", " ") + if err != nil { + log.Printf("Could not serialize stats %+v: %s", stats, err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + w.Write(statsData) +} diff --git a/src/signaling/hub.go b/src/signaling/hub.go index ae2e643..1bf2ae4 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -1413,6 +1413,22 @@ func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) { room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users) } +func (h *Hub) GetStats() map[string]interface{} { + result := make(map[string]interface{}) + h.ru.RLock() + result["rooms"] = len(h.rooms) + h.ru.RUnlock() + h.mu.Lock() + result["sessions"] = len(h.sessions) + h.mu.Unlock() + if h.mcu != nil { + if stats := h.mcu.GetStats(); stats != nil { + result["mcu"] = stats + } + } + return result +} + func getRealUserIP(r *http.Request) string { // Note this function assumes it is running behind a trusted proxy, so // the headers can be trusted. diff --git a/src/signaling/mcu_common.go b/src/signaling/mcu_common.go index 4aefa3e..7bfd250 100644 --- a/src/signaling/mcu_common.go +++ b/src/signaling/mcu_common.go @@ -45,6 +45,8 @@ type Mcu interface { Start() error Stop() + GetStats() interface{} + NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) } diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 78db934..f8f921d 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -159,6 +159,8 @@ type mcuJanus struct { reconnectTimer *time.Timer reconnectInterval time.Duration + + connectedSince time.Time } func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) { @@ -303,6 +305,7 @@ func (m *mcuJanus) Start() error { return err } log.Println("Created Janus session", m.session.Id) + m.connectedSince = time.Now() if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil { m.disconnect() @@ -346,6 +349,31 @@ func (m *mcuJanus) Stop() { m.reconnectTimer.Stop() } +type mcuJanusConnectionStats struct { + Url string `json:"url"` + Connected bool `json:"connected"` + Publishers int64 `json:"publishers"` + Clients int64 `json:"clients"` + Uptime *time.Time `json:"uptime,omitempty"` +} + +func (m *mcuJanus) GetStats() interface{} { + result := mcuJanusConnectionStats{ + Url: m.url, + } + if m.session != nil { + result.Connected = true + result.Uptime = &m.connectedSince + } + m.mu.Lock() + result.Publishers = int64(len(m.publisherRoomIds)) + m.mu.Unlock() + m.muClients.Lock() + result.Clients = int64(len(m.clients)) + m.muClients.Unlock() + return result +} + func (m *mcuJanus) sendKeepalive() { ctx := context.TODO() if _, err := m.session.KeepAlive(ctx); err != nil { diff --git a/src/signaling/mcu_test.go b/src/signaling/mcu_test.go index dc7c153..8bdaad3 100644 --- a/src/signaling/mcu_test.go +++ b/src/signaling/mcu_test.go @@ -41,6 +41,10 @@ func (m *TestMCU) Start() error { func (m *TestMCU) Stop() { } +func (m *TestMCU) GetStats() interface{} { + return nil +} + func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { return nil, fmt.Errorf("Not implemented") }