From 3e9bace0ad14378e932d381fa45943bd5f86ce4e Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 6 Mar 2025 13:08:03 +0100 Subject: [PATCH] Move serverinfo result generation to source, add nats, gprc and etcd. --- api_backend.go | 36 +++++++++-- async_events_nats.go | 25 ++++++++ backend_server.go | 92 ++++------------------------- docs/standalone-signaling-api-v1.md | 50 ++++++++++++++++ etcd_client.go | 20 +++++++ grpc_client.go | 54 ++++++++++++++--- hub.go | 35 +++++++++++ mcu_common.go | 1 + mcu_janus.go | 32 ++++++++++ mcu_proxy.go | 38 ++++++++++++ mcu_test.go | 4 ++ proxy/proxy_server_test.go | 4 ++ 12 files changed, 298 insertions(+), 93 deletions(-) diff --git a/api_backend.go b/api_backend.go index 7ac40ef..5ba210a 100644 --- a/api_backend.go +++ b/api_backend.go @@ -508,9 +508,8 @@ type BackendServerInfoSfuProxy struct { type SfuMode string const ( - SfuModeUnknown SfuMode = "unknown" - SfuModeJanus SfuMode = "janus" - SfuModeProxy SfuMode = "proxy" + SfuModeJanus SfuMode = "janus" + SfuModeProxy SfuMode = "proxy" ) type BackendServerInfoSfu struct { @@ -529,10 +528,39 @@ type BackendServerInfoDialout struct { Features []string `json:"features,omitempty"` } +type BackendServerInfoNats struct { + Urls []string `json:"urls"` + Connected bool `json:"connected"` + + ServerUrl string `json:"serverurl,omitempty"` + ServerID string `json:"serverid,omitempty"` + ServerVersion string `json:"version,omitempty"` + ClusterName string `json:"clustername,omitempty"` +} + +type BackendServerInfoGrpc struct { + Target string `json:"target"` + IP string `json:"ip,omitempty"` + Connected bool `json:"connected"` + + Version string `json:"version,omitempty"` +} + +type BackendServerInfoEtcd struct { + Endpoints []string `json:"endpoints"` + + Active string `json:"active,omitempty"` + Connected *bool `json:"connected,omitempty"` +} + type BackendServerInfo struct { Version string `json:"version"` Features []string `json:"features"` - Sfu BackendServerInfoSfu `json:"sfu"` + Sfu *BackendServerInfoSfu `json:"sfu,omitempty"` Dialout []BackendServerInfoDialout `json:"dialout,omitempty"` + + Nats *BackendServerInfoNats `json:"nats,omitempty"` + Grpc []BackendServerInfoGrpc `json:"grpc,omitempty"` + Etcd *BackendServerInfoEtcd `json:"etcd,omitempty"` } diff --git a/async_events_nats.go b/async_events_nats.go index 0db3502..46408d0 100644 --- a/async_events_nats.go +++ b/async_events_nats.go @@ -248,6 +248,31 @@ func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) { return events, nil } +func (e *asyncEventsNats) GetServerInfoNats() *BackendServerInfoNats { + var nats *BackendServerInfoNats + switch n := e.client.(type) { + case *natsClient: + nats = &BackendServerInfoNats{ + Urls: n.conn.Servers(), + } + if c := n.conn; c.IsConnected() { + nats.Connected = true + nats.ServerUrl = c.ConnectedUrl() + nats.ServerID = c.ConnectedServerId() + nats.ServerVersion = c.ConnectedServerVersion() + nats.ClusterName = c.ConnectedClusterName() + } + case *LoopbackNatsClient: + nats = &BackendServerInfoNats{ + Urls: []string{NatsLoopbackUrl}, + Connected: true, + ServerUrl: NatsLoopbackUrl, + } + } + + return nats +} + func (e *asyncEventsNats) Close() { e.mu.Lock() defer e.mu.Unlock() diff --git a/backend_server.go b/backend_server.go index cd4ab40..477f04b 100644 --- a/backend_server.go +++ b/backend_server.go @@ -952,91 +952,23 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) { } func (b *BackendServer) serverinfoHandler(w http.ResponseWriter, r *http.Request) { - var sfu BackendServerInfoSfu - switch m := b.hub.mcu.(type) { - case *mcuJanus: - sfu.Mode = SfuModeJanus - janus := &BackendServerInfoSfuJanus{ - Url: m.url, - } - if m.IsConnected() { - janus.Connected = true - if info := m.Info(); info != nil { - janus.Name = info.Name - janus.Version = info.VersionString - janus.Author = info.Author - janus.DataChannels = makePtr(info.DataChannels) - janus.FullTrickle = makePtr(info.FullTrickle) - janus.LocalIP = info.LocalIP - janus.IPv6 = makePtr(info.IPv6) - - if plugin, found := info.Plugins[pluginVideoRoom]; found { - janus.VideoRoom = &BackendServerInfoVideoRoom{ - Name: plugin.Name, - Version: plugin.VersionString, - Author: plugin.Author, - } - } - } - } - sfu.Janus = janus - case *mcuProxy: - sfu.Mode = SfuModeProxy - for _, c := range m.connections { - proxy := BackendServerInfoSfuProxy{ - Url: c.rawUrl, - - Temporary: c.IsTemporary(), - } - if len(c.ip) > 0 { - proxy.IP = c.ip.String() - } - if c.IsConnected() { - proxy.Connected = true - proxy.Shutdown = makePtr(c.IsShutdownScheduled()) - proxy.Uptime = &c.connectedSince - proxy.Version = c.Version() - proxy.Features = c.Features() - proxy.Country = c.Country() - proxy.Load = makePtr(c.Load()) - proxy.Bandwidth = c.Bandwidth() - } - sfu.Proxies = append(sfu.Proxies, proxy) - } - default: - sfu.Mode = SfuModeUnknown - } - info := BackendServerInfo{ Version: b.version, Features: b.hub.info.Features, - Sfu: sfu, + Dialout: b.hub.GetServerInfoDialout(), } - - b.hub.mu.RLock() - defer b.hub.mu.RUnlock() - for session := range b.hub.dialoutSessions { - dialout := BackendServerInfoDialout{ - SessionId: session.PublicId(), - } - if client := session.GetClient(); client != nil && client.IsConnected() { - dialout.Connected = true - dialout.Address = client.RemoteAddr() - if ua := client.UserAgent(); ua != "" { - dialout.UserAgent = ua - // Extract version from user-agent, expects "software/version". - if pos := strings.IndexByte(ua, '/'); pos != -1 { - version := ua[pos+1:] - if pos = strings.IndexByte(version, ' '); pos != -1 { - version = version[:pos] - } - dialout.Version = version - } - } - dialout.Features = session.GetFeatures() - } - info.Dialout = append(info.Dialout, dialout) + if mcu := b.hub.mcu; mcu != nil { + info.Sfu = mcu.GetServerInfoSfu() + } + if e, ok := b.events.(*asyncEventsNats); ok { + info.Nats = e.GetServerInfoNats() + } + if rpcClients := b.hub.rpcClients; rpcClients != nil { + info.Grpc = rpcClients.GetServerInfoGrpc() + } + if etcdClient := b.hub.etcdClient; etcdClient != nil { + info.Etcd = etcdClient.GetServerInfoEtcd() } infoData, err := json.MarshalIndent(info, "", " ") diff --git a/docs/standalone-signaling-api-v1.md b/docs/standalone-signaling-api-v1.md index 6bf571f..1487d35 100644 --- a/docs/standalone-signaling-api-v1.md +++ b/docs/standalone-signaling-api-v1.md @@ -1538,6 +1538,56 @@ for streams on the proxy. Only present if a bandwidth limit is configured on the proxy. +### NATS connection + +Information about the NATS connection are also returned by the serverinfo +endpoint: + + { + "urls": [ + "nats://localhost:4222" + ], + "connected": true, + "serverurl": "nats://localhost:4222", + "serverid": "556c9de63ac214e53a9b976b2e5305d8", + "version": "0.6.8" + } + + +### GRPC connections + +In clustered mode, the signaling server has GRPC connections to other instances +which are included in the serverinfo response: + + [ + { + "target": "192.168.1.1:8080", + "connected": true, + "version": "1.2.3" + }, + { + "target": "192.168.1.2:8080", + "connected": true, + "version": "1.2.3" + } + ] + + +### ETCD cluster + +Some configuration can be loaded from an etcd cluster, the serverinfo response +also contains information on that connection: + + { + "endpoints": [ + "192.168.4.1:2379", + "192.168.4.2:2379" + ], + "active": "etcd-endpoints://0xc0001fba40/192.168.4.2:2379", + "connected": true + } + + ### Dialout session If a SIP bridge with support for dial-out is connected, the serverinfo response diff --git a/etcd_client.go b/etcd_client.go index 23bce3a..e232fce 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -37,6 +37,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "google.golang.org/grpc/connectivity" ) type EtcdClientListener interface { @@ -68,6 +69,25 @@ func NewEtcdClient(config *goconf.ConfigFile, compatSection string) (*EtcdClient return result, nil } +func (c *EtcdClient) GetServerInfoEtcd() *BackendServerInfoEtcd { + client := c.getEtcdClient() + if client == nil { + return nil + } + + result := &BackendServerInfoEtcd{ + Endpoints: client.Endpoints(), + } + + conn := client.ActiveConnection() + if conn != nil { + result.Active = conn.Target() + result.Connected = makePtr(conn.GetState() == connectivity.Ready) + } + + return result +} + func (c *EtcdClient) getConfigStringWithFallback(config *goconf.ConfigFile, option string) string { value, _ := config.GetString("etcd", option) if value == "" && c.compatSection != "" { diff --git a/grpc_client.go b/grpc_client.go index 4b10631..7aad2a2 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -39,6 +39,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -79,12 +80,14 @@ func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl { } type GrpcClient struct { - ip net.IP - target string - conn *grpc.ClientConn - impl *grpcClientImpl + ip net.IP + rawTarget string + target string + conn *grpc.ClientConn + impl *grpcClientImpl - isSelf atomic.Bool + isSelf atomic.Bool + version atomic.Value } type customIpResolver struct { @@ -151,15 +154,17 @@ func NewGrpcClient(target string, ip net.IP, opts ...grpc.DialOption) (*GrpcClie } result := &GrpcClient{ - ip: ip, - target: target, - conn: conn, - impl: newGrpcClientImpl(conn), + ip: ip, + rawTarget: target, + target: target, + conn: conn, + impl: newGrpcClientImpl(conn), } if ip != nil { result.target += " (" + ip.String() + ")" } + result.version.Store("") return result, nil } @@ -167,6 +172,10 @@ func (c *GrpcClient) Target() string { return c.target } +func (c *GrpcClient) Version() string { + return c.version.Load().(string) +} + func (c *GrpcClient) Close() error { return c.conn.Close() } @@ -440,6 +449,32 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonito return result, nil } +func (c *GrpcClients) GetServerInfoGrpc() (result []BackendServerInfoGrpc) { + c.mu.RLock() + defer c.mu.RUnlock() + + for _, client := range c.clients { + if client.IsSelf() { + continue + } + + grpc := BackendServerInfoGrpc{ + Target: client.rawTarget, + } + if len(client.ip) > 0 { + grpc.IP = client.ip.String() + } + if client.conn.GetState() == connectivity.Ready { + grpc.Connected = true + grpc.Version = client.Version() + } + + result = append(result, grpc) + } + + return +} + func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error { creds, err := NewReloadableCredentials(config, false) if err != nil { @@ -537,6 +572,7 @@ loop: continue } + client.version.Store(version) if id == GrpcServerId { log.Printf("GRPC target %s is this server, removing", client.Target()) c.closeClient(client) diff --git a/hub.go b/hub.go index 4b00a38..27d2bfd 100644 --- a/hub.go +++ b/hub.go @@ -179,6 +179,7 @@ type Hub struct { geoipOverrides atomic.Pointer[map[*net.IPNet]string] geoipUpdating atomic.Bool + etcdClient *EtcdClient rpcServer *GrpcServer rpcClients *GrpcClients @@ -365,6 +366,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer geoip: geoip, + etcdClient: etcdClient, rpcServer: rpcServer, rpcClients: rpcClients, @@ -2816,6 +2818,39 @@ func (h *Hub) GetStats() map[string]interface{} { return result } +func (h *Hub) GetServerInfoDialout() (result []BackendServerInfoDialout) { + h.mu.RLock() + defer h.mu.RUnlock() + + for session := range h.dialoutSessions { + dialout := BackendServerInfoDialout{ + SessionId: session.PublicId(), + } + if client := session.GetClient(); client != nil && client.IsConnected() { + dialout.Connected = true + dialout.Address = client.RemoteAddr() + if ua := client.UserAgent(); ua != "" { + dialout.UserAgent = ua + // Extract version from user-agent, expects "software/version". + if pos := strings.IndexByte(ua, '/'); pos != -1 { + version := ua[pos+1:] + if pos = strings.IndexByte(version, ' '); pos != -1 { + version = version[:pos] + } + dialout.Version = version + } + } + dialout.Features = session.GetFeatures() + } + result = append(result, dialout) + } + + slices.SortFunc(result, func(a, b BackendServerInfoDialout) int { + return strings.Compare(a.SessionId, b.SessionId) + }) + return +} + func GetRealUserIP(r *http.Request, trusted *AllowedIps) string { addr := r.RemoteAddr if host, _, err := net.SplitHostPort(addr); err == nil { diff --git a/mcu_common.go b/mcu_common.go index af22d4a..7d78690 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -128,6 +128,7 @@ type Mcu interface { SetOnDisconnected(func()) GetStats() interface{} + GetServerInfoSfu() *BackendServerInfoSfu NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error) diff --git a/mcu_janus.go b/mcu_janus.go index f2d97bc..ea08371 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -421,6 +421,38 @@ func (m *mcuJanus) Info() *InfoMsg { return m.info.Load() } +func (m *mcuJanus) GetServerInfoSfu() *BackendServerInfoSfu { + janus := &BackendServerInfoSfuJanus{ + Url: m.url, + } + if m.IsConnected() { + janus.Connected = true + if info := m.Info(); info != nil { + janus.Name = info.Name + janus.Version = info.VersionString + janus.Author = info.Author + janus.DataChannels = makePtr(info.DataChannels) + janus.FullTrickle = makePtr(info.FullTrickle) + janus.LocalIP = info.LocalIP + janus.IPv6 = makePtr(info.IPv6) + + if plugin, found := info.Plugins[pluginVideoRoom]; found { + janus.VideoRoom = &BackendServerInfoVideoRoom{ + Name: plugin.Name, + Version: plugin.VersionString, + Author: plugin.Author, + } + } + } + } + + sfu := &BackendServerInfoSfu{ + Mode: SfuModeJanus, + Janus: janus, + } + return sfu +} + func (m *mcuJanus) Reload(config *goconf.ConfigFile) { m.settings.Reload(config) } diff --git a/mcu_proxy.go b/mcu_proxy.go index ea9e1ef..72fab9a 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -1670,6 +1670,44 @@ func (m *mcuProxy) GetStats() interface{} { return result } +func (m *mcuProxy) GetServerInfoSfu() *BackendServerInfoSfu { + m.connectionsMu.RLock() + defer m.connectionsMu.RUnlock() + + sfu := &BackendServerInfoSfu{ + Mode: SfuModeProxy, + } + for _, c := range m.connections { + proxy := BackendServerInfoSfuProxy{ + Url: c.rawUrl, + + Temporary: c.IsTemporary(), + } + if len(c.ip) > 0 { + proxy.IP = c.ip.String() + } + if c.IsConnected() { + proxy.Connected = true + proxy.Shutdown = makePtr(c.IsShutdownScheduled()) + proxy.Uptime = &c.connectedSince + proxy.Version = c.Version() + proxy.Features = c.Features() + proxy.Country = c.Country() + proxy.Load = makePtr(c.Load()) + proxy.Bandwidth = c.Bandwidth() + } + sfu.Proxies = append(sfu.Proxies, proxy) + } + slices.SortFunc(sfu.Proxies, func(a, b BackendServerInfoSfuProxy) int { + c := strings.Compare(a.Url, b.Url) + if c == 0 { + c = strings.Compare(a.IP, b.IP) + } + return c + }) + return sfu +} + func (m *mcuProxy) getContinentsMap() map[string][]string { continentsMap := m.continentsMap.Load() if continentsMap == nil { diff --git a/mcu_test.go b/mcu_test.go index 6db0db6..9097267 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -70,6 +70,10 @@ func (m *TestMCU) GetStats() interface{} { return nil } +func (m *TestMCU) GetServerInfoSfu() *BackendServerInfoSfu { + return nil +} + func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) { var maxBitrate int if streamType == StreamTypeScreen { diff --git a/proxy/proxy_server_test.go b/proxy/proxy_server_test.go index 3dfd136..25eff8d 100644 --- a/proxy/proxy_server_test.go +++ b/proxy/proxy_server_test.go @@ -379,6 +379,10 @@ func (m *TestMCU) GetStats() interface{} { return nil } +func (m *TestMCU) GetServerInfoSfu() *signaling.BackendServerInfoSfu { + return nil +} + func (m *TestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) { return nil, errors.New("not implemented") }