Merge pull request #16 from strukturag/stats-api

Add basic stats API.
This commit is contained in:
Joachim Bauch 2020-06-30 17:01:48 +02:00 committed by GitHub
commit 3d73ab48db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 0 deletions

View file

@ -110,3 +110,8 @@ url =
# information. # information.
# Leave empty to disable GeoIP lookups. # Leave empty to disable GeoIP lookups.
#license = #license =
[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =

View file

@ -30,6 +30,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
@ -49,6 +50,7 @@ const (
) )
type BackendServer struct { type BackendServer struct {
hub *Hub
nats NatsClient nats NatsClient
roomSessions RoomSessions roomSessions RoomSessions
@ -61,6 +63,8 @@ type BackendServer struct {
turnsecret []byte turnsecret []byte
turnvalid time.Duration turnvalid time.Duration
turnservers []string turnservers []string
statsAllowedIps map[string]bool
} }
func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) { func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) {
@ -95,7 +99,26 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
} }
} }
statsAllowed, _ := config.GetString("stats", "allowed_ips")
var statsAllowedIps map[string]bool
if statsAllowed == "" {
log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
statsAllowedIps = map[string]bool{
"127.0.0.1": true,
}
} else {
log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
statsAllowedIps = make(map[string]bool)
for _, ip := range strings.Split(statsAllowed, ",") {
ip = strings.TrimSpace(ip)
if ip != "" {
statsAllowedIps[ip] = true
}
}
}
return &BackendServer{ return &BackendServer{
hub: hub,
nats: hub.nats, nats: hub.nats,
roomSessions: hub.roomSessions, roomSessions: hub.roomSessions,
version: version, version: version,
@ -107,6 +130,8 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
turnsecret: []byte(turnsecret), turnsecret: []byte(turnsecret),
turnvalid: turnvalid, turnvalid: turnvalid,
turnservers: turnserverslist, turnservers: turnserverslist,
statsAllowedIps: statsAllowedIps,
}, nil }, nil
} }
@ -124,6 +149,7 @@ func (b *BackendServer) Start(r *mux.Router) error {
s := r.PathPrefix("/api/v1").Subrouter() s := r.PathPrefix("/api/v1").Subrouter()
s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET") s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET")
s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST") s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST")
s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")
// Provide a REST service to get TURN credentials. // Provide a REST service to get TURN credentials.
// See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 // See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
@ -524,3 +550,35 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
// TODO(jojo): Return better response struct. // TODO(jojo): Return better response struct.
w.Write([]byte("{}")) w.Write([]byte("{}"))
} }
func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
if strings.Contains(addr, ":") {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
}
if !b.statsAllowedIps[addr] {
http.Error(w, "Authentication check failed", http.StatusForbidden)
return
}
f(w, r)
}
}
func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
stats := b.hub.GetStats()
statsData, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.Printf("Could not serialize stats %+v: %s", stats, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
w.Write(statsData)
}

View file

@ -1413,6 +1413,22 @@ func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) {
room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users) room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users)
} }
func (h *Hub) GetStats() map[string]interface{} {
result := make(map[string]interface{})
h.ru.RLock()
result["rooms"] = len(h.rooms)
h.ru.RUnlock()
h.mu.Lock()
result["sessions"] = len(h.sessions)
h.mu.Unlock()
if h.mcu != nil {
if stats := h.mcu.GetStats(); stats != nil {
result["mcu"] = stats
}
}
return result
}
func getRealUserIP(r *http.Request) string { func getRealUserIP(r *http.Request) string {
// Note this function assumes it is running behind a trusted proxy, so // Note this function assumes it is running behind a trusted proxy, so
// the headers can be trusted. // the headers can be trusted.

View file

@ -45,6 +45,8 @@ type Mcu interface {
Start() error Start() error
Stop() Stop()
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
} }

View file

@ -159,6 +159,8 @@ type mcuJanus struct {
reconnectTimer *time.Timer reconnectTimer *time.Timer
reconnectInterval time.Duration reconnectInterval time.Duration
connectedSince time.Time
} }
func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) { func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
@ -303,6 +305,7 @@ func (m *mcuJanus) Start() error {
return err return err
} }
log.Println("Created Janus session", m.session.Id) log.Println("Created Janus session", m.session.Id)
m.connectedSince = time.Now()
if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil { if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil {
m.disconnect() m.disconnect()
@ -346,6 +349,31 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop() m.reconnectTimer.Stop()
} }
type mcuJanusConnectionStats struct {
Url string `json:"url"`
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Uptime *time.Time `json:"uptime,omitempty"`
}
func (m *mcuJanus) GetStats() interface{} {
result := mcuJanusConnectionStats{
Url: m.url,
}
if m.session != nil {
result.Connected = true
result.Uptime = &m.connectedSince
}
m.mu.Lock()
result.Publishers = int64(len(m.publisherRoomIds))
m.mu.Unlock()
m.muClients.Lock()
result.Clients = int64(len(m.clients))
m.muClients.Unlock()
return result
}
func (m *mcuJanus) sendKeepalive() { func (m *mcuJanus) sendKeepalive() {
ctx := context.TODO() ctx := context.TODO()
if _, err := m.session.KeepAlive(ctx); err != nil { if _, err := m.session.KeepAlive(ctx); err != nil {

View file

@ -41,6 +41,10 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() { func (m *TestMCU) Stop() {
} }
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented") return nil, fmt.Errorf("Not implemented")
} }