Add basic stats API.

Can be used to query number of sessions, rooms and (if Janus is configured),
overall MCU clients and publishers.
This commit is contained in:
Joachim Bauch 2020-05-28 16:02:04 +02:00
parent 563658bf59
commit 6455e70f15
Failed to extract signature
6 changed files with 113 additions and 0 deletions

View File

@ -110,3 +110,8 @@ url =
# information.
# Leave empty to disable GeoIP lookups.
#license =
[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =

View File

@ -30,6 +30,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"reflect"
"strings"
@ -49,6 +50,7 @@ const (
)
type BackendServer struct {
hub *Hub
nats NatsClient
roomSessions RoomSessions
@ -61,6 +63,8 @@ type BackendServer struct {
turnsecret []byte
turnvalid time.Duration
turnservers []string
statsAllowedIps map[string]bool
}
func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) {
@ -95,7 +99,26 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
}
}
statsAllowed, _ := config.GetString("stats", "allowed_ips")
var statsAllowedIps map[string]bool
if statsAllowed == "" {
log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
statsAllowedIps = map[string]bool{
"127.0.0.1": true,
}
} else {
log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
statsAllowedIps = make(map[string]bool)
for _, ip := range strings.Split(statsAllowed, ",") {
ip = strings.TrimSpace(ip)
if ip != "" {
statsAllowedIps[ip] = true
}
}
}
return &BackendServer{
hub: hub,
nats: hub.nats,
roomSessions: hub.roomSessions,
version: version,
@ -107,6 +130,8 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
turnsecret: []byte(turnsecret),
turnvalid: turnvalid,
turnservers: turnserverslist,
statsAllowedIps: statsAllowedIps,
}, nil
}
@ -124,6 +149,7 @@ func (b *BackendServer) Start(r *mux.Router) error {
s := r.PathPrefix("/api/v1").Subrouter()
s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET")
s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST")
s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")
// Provide a REST service to get TURN credentials.
// See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
@ -524,3 +550,35 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
// TODO(jojo): Return better response struct.
w.Write([]byte("{}"))
}
func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
if strings.Contains(addr, ":") {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
}
if !b.statsAllowedIps[addr] {
http.Error(w, "Authentication check failed", http.StatusForbidden)
return
}
f(w, r)
}
}
func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
stats := b.hub.GetStats()
statsData, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.Printf("Could not serialize stats %+v: %s", stats, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
w.Write(statsData)
}

View File

@ -1413,6 +1413,22 @@ func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) {
room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users)
}
func (h *Hub) GetStats() map[string]interface{} {
result := make(map[string]interface{})
h.ru.RLock()
result["rooms"] = len(h.rooms)
h.ru.RUnlock()
h.mu.Lock()
result["sessions"] = len(h.sessions)
h.mu.Unlock()
if h.mcu != nil {
if stats := h.mcu.GetStats(); stats != nil {
result["mcu"] = stats
}
}
return result
}
func getRealUserIP(r *http.Request) string {
// Note this function assumes it is running behind a trusted proxy, so
// the headers can be trusted.

View File

@ -45,6 +45,8 @@ type Mcu interface {
Start() error
Stop()
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
}

View File

@ -159,6 +159,8 @@ type mcuJanus struct {
reconnectTimer *time.Timer
reconnectInterval time.Duration
connectedSince time.Time
}
func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
@ -303,6 +305,7 @@ func (m *mcuJanus) Start() error {
return err
}
log.Println("Created Janus session", m.session.Id)
m.connectedSince = time.Now()
if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil {
m.disconnect()
@ -346,6 +349,31 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop()
}
type mcuJanusConnectionStats struct {
Url string `json:"url"`
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Uptime *time.Time `json:"uptime,omitempty"`
}
func (m *mcuJanus) GetStats() interface{} {
result := mcuJanusConnectionStats{
Url: m.url,
}
if m.session != nil {
result.Connected = true
result.Uptime = &m.connectedSince
}
m.mu.Lock()
result.Publishers = int64(len(m.publisherRoomIds))
m.mu.Unlock()
m.muClients.Lock()
result.Clients = int64(len(m.clients))
m.muClients.Unlock()
return result
}
func (m *mcuJanus) sendKeepalive() {
ctx := context.TODO()
if _, err := m.session.KeepAlive(ctx); err != nil {

View File

@ -41,6 +41,10 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() {
}
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented")
}