Move serverinfo result generation to source, add nats, gprc and etcd.

This commit is contained in:
Joachim Bauch 2025-03-06 13:08:03 +01:00
commit 3e9bace0ad
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
12 changed files with 298 additions and 93 deletions

View file

@ -508,9 +508,8 @@ type BackendServerInfoSfuProxy struct {
type SfuMode string
const (
SfuModeUnknown SfuMode = "unknown"
SfuModeJanus SfuMode = "janus"
SfuModeProxy SfuMode = "proxy"
SfuModeJanus SfuMode = "janus"
SfuModeProxy SfuMode = "proxy"
)
type BackendServerInfoSfu struct {
@ -529,10 +528,39 @@ type BackendServerInfoDialout struct {
Features []string `json:"features,omitempty"`
}
type BackendServerInfoNats struct {
Urls []string `json:"urls"`
Connected bool `json:"connected"`
ServerUrl string `json:"serverurl,omitempty"`
ServerID string `json:"serverid,omitempty"`
ServerVersion string `json:"version,omitempty"`
ClusterName string `json:"clustername,omitempty"`
}
type BackendServerInfoGrpc struct {
Target string `json:"target"`
IP string `json:"ip,omitempty"`
Connected bool `json:"connected"`
Version string `json:"version,omitempty"`
}
type BackendServerInfoEtcd struct {
Endpoints []string `json:"endpoints"`
Active string `json:"active,omitempty"`
Connected *bool `json:"connected,omitempty"`
}
type BackendServerInfo struct {
Version string `json:"version"`
Features []string `json:"features"`
Sfu BackendServerInfoSfu `json:"sfu"`
Sfu *BackendServerInfoSfu `json:"sfu,omitempty"`
Dialout []BackendServerInfoDialout `json:"dialout,omitempty"`
Nats *BackendServerInfoNats `json:"nats,omitempty"`
Grpc []BackendServerInfoGrpc `json:"grpc,omitempty"`
Etcd *BackendServerInfoEtcd `json:"etcd,omitempty"`
}

View file

@ -248,6 +248,31 @@ func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) {
return events, nil
}
func (e *asyncEventsNats) GetServerInfoNats() *BackendServerInfoNats {
var nats *BackendServerInfoNats
switch n := e.client.(type) {
case *natsClient:
nats = &BackendServerInfoNats{
Urls: n.conn.Servers(),
}
if c := n.conn; c.IsConnected() {
nats.Connected = true
nats.ServerUrl = c.ConnectedUrl()
nats.ServerID = c.ConnectedServerId()
nats.ServerVersion = c.ConnectedServerVersion()
nats.ClusterName = c.ConnectedClusterName()
}
case *LoopbackNatsClient:
nats = &BackendServerInfoNats{
Urls: []string{NatsLoopbackUrl},
Connected: true,
ServerUrl: NatsLoopbackUrl,
}
}
return nats
}
func (e *asyncEventsNats) Close() {
e.mu.Lock()
defer e.mu.Unlock()

View file

@ -952,91 +952,23 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
}
func (b *BackendServer) serverinfoHandler(w http.ResponseWriter, r *http.Request) {
var sfu BackendServerInfoSfu
switch m := b.hub.mcu.(type) {
case *mcuJanus:
sfu.Mode = SfuModeJanus
janus := &BackendServerInfoSfuJanus{
Url: m.url,
}
if m.IsConnected() {
janus.Connected = true
if info := m.Info(); info != nil {
janus.Name = info.Name
janus.Version = info.VersionString
janus.Author = info.Author
janus.DataChannels = makePtr(info.DataChannels)
janus.FullTrickle = makePtr(info.FullTrickle)
janus.LocalIP = info.LocalIP
janus.IPv6 = makePtr(info.IPv6)
if plugin, found := info.Plugins[pluginVideoRoom]; found {
janus.VideoRoom = &BackendServerInfoVideoRoom{
Name: plugin.Name,
Version: plugin.VersionString,
Author: plugin.Author,
}
}
}
}
sfu.Janus = janus
case *mcuProxy:
sfu.Mode = SfuModeProxy
for _, c := range m.connections {
proxy := BackendServerInfoSfuProxy{
Url: c.rawUrl,
Temporary: c.IsTemporary(),
}
if len(c.ip) > 0 {
proxy.IP = c.ip.String()
}
if c.IsConnected() {
proxy.Connected = true
proxy.Shutdown = makePtr(c.IsShutdownScheduled())
proxy.Uptime = &c.connectedSince
proxy.Version = c.Version()
proxy.Features = c.Features()
proxy.Country = c.Country()
proxy.Load = makePtr(c.Load())
proxy.Bandwidth = c.Bandwidth()
}
sfu.Proxies = append(sfu.Proxies, proxy)
}
default:
sfu.Mode = SfuModeUnknown
}
info := BackendServerInfo{
Version: b.version,
Features: b.hub.info.Features,
Sfu: sfu,
Dialout: b.hub.GetServerInfoDialout(),
}
b.hub.mu.RLock()
defer b.hub.mu.RUnlock()
for session := range b.hub.dialoutSessions {
dialout := BackendServerInfoDialout{
SessionId: session.PublicId(),
}
if client := session.GetClient(); client != nil && client.IsConnected() {
dialout.Connected = true
dialout.Address = client.RemoteAddr()
if ua := client.UserAgent(); ua != "" {
dialout.UserAgent = ua
// Extract version from user-agent, expects "software/version".
if pos := strings.IndexByte(ua, '/'); pos != -1 {
version := ua[pos+1:]
if pos = strings.IndexByte(version, ' '); pos != -1 {
version = version[:pos]
}
dialout.Version = version
}
}
dialout.Features = session.GetFeatures()
}
info.Dialout = append(info.Dialout, dialout)
if mcu := b.hub.mcu; mcu != nil {
info.Sfu = mcu.GetServerInfoSfu()
}
if e, ok := b.events.(*asyncEventsNats); ok {
info.Nats = e.GetServerInfoNats()
}
if rpcClients := b.hub.rpcClients; rpcClients != nil {
info.Grpc = rpcClients.GetServerInfoGrpc()
}
if etcdClient := b.hub.etcdClient; etcdClient != nil {
info.Etcd = etcdClient.GetServerInfoEtcd()
}
infoData, err := json.MarshalIndent(info, "", " ")

View file

@ -1538,6 +1538,56 @@ for streams on the proxy. Only present if a bandwidth limit is configured on
the proxy.
### NATS connection
Information about the NATS connection are also returned by the serverinfo
endpoint:
{
"urls": [
"nats://localhost:4222"
],
"connected": true,
"serverurl": "nats://localhost:4222",
"serverid": "556c9de63ac214e53a9b976b2e5305d8",
"version": "0.6.8"
}
### GRPC connections
In clustered mode, the signaling server has GRPC connections to other instances
which are included in the serverinfo response:
[
{
"target": "192.168.1.1:8080",
"connected": true,
"version": "1.2.3"
},
{
"target": "192.168.1.2:8080",
"connected": true,
"version": "1.2.3"
}
]
### ETCD cluster
Some configuration can be loaded from an etcd cluster, the serverinfo response
also contains information on that connection:
{
"endpoints": [
"192.168.4.1:2379",
"192.168.4.2:2379"
],
"active": "etcd-endpoints://0xc0001fba40/192.168.4.2:2379",
"connected": true
}
### Dialout session
If a SIP bridge with support for dial-out is connected, the serverinfo response

View file

@ -37,6 +37,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/connectivity"
)
type EtcdClientListener interface {
@ -68,6 +69,25 @@ func NewEtcdClient(config *goconf.ConfigFile, compatSection string) (*EtcdClient
return result, nil
}
func (c *EtcdClient) GetServerInfoEtcd() *BackendServerInfoEtcd {
client := c.getEtcdClient()
if client == nil {
return nil
}
result := &BackendServerInfoEtcd{
Endpoints: client.Endpoints(),
}
conn := client.ActiveConnection()
if conn != nil {
result.Active = conn.Target()
result.Connected = makePtr(conn.GetState() == connectivity.Ready)
}
return result
}
func (c *EtcdClient) getConfigStringWithFallback(config *goconf.ConfigFile, option string) string {
value, _ := config.GetString("etcd", option)
if value == "" && c.compatSection != "" {

View file

@ -39,6 +39,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
@ -79,12 +80,14 @@ func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl {
}
type GrpcClient struct {
ip net.IP
target string
conn *grpc.ClientConn
impl *grpcClientImpl
ip net.IP
rawTarget string
target string
conn *grpc.ClientConn
impl *grpcClientImpl
isSelf atomic.Bool
isSelf atomic.Bool
version atomic.Value
}
type customIpResolver struct {
@ -151,15 +154,17 @@ func NewGrpcClient(target string, ip net.IP, opts ...grpc.DialOption) (*GrpcClie
}
result := &GrpcClient{
ip: ip,
target: target,
conn: conn,
impl: newGrpcClientImpl(conn),
ip: ip,
rawTarget: target,
target: target,
conn: conn,
impl: newGrpcClientImpl(conn),
}
if ip != nil {
result.target += " (" + ip.String() + ")"
}
result.version.Store("")
return result, nil
}
@ -167,6 +172,10 @@ func (c *GrpcClient) Target() string {
return c.target
}
func (c *GrpcClient) Version() string {
return c.version.Load().(string)
}
func (c *GrpcClient) Close() error {
return c.conn.Close()
}
@ -440,6 +449,32 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonito
return result, nil
}
func (c *GrpcClients) GetServerInfoGrpc() (result []BackendServerInfoGrpc) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, client := range c.clients {
if client.IsSelf() {
continue
}
grpc := BackendServerInfoGrpc{
Target: client.rawTarget,
}
if len(client.ip) > 0 {
grpc.IP = client.ip.String()
}
if client.conn.GetState() == connectivity.Ready {
grpc.Connected = true
grpc.Version = client.Version()
}
result = append(result, grpc)
}
return
}
func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error {
creds, err := NewReloadableCredentials(config, false)
if err != nil {
@ -537,6 +572,7 @@ loop:
continue
}
client.version.Store(version)
if id == GrpcServerId {
log.Printf("GRPC target %s is this server, removing", client.Target())
c.closeClient(client)

35
hub.go
View file

@ -179,6 +179,7 @@ type Hub struct {
geoipOverrides atomic.Pointer[map[*net.IPNet]string]
geoipUpdating atomic.Bool
etcdClient *EtcdClient
rpcServer *GrpcServer
rpcClients *GrpcClients
@ -365,6 +366,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
geoip: geoip,
etcdClient: etcdClient,
rpcServer: rpcServer,
rpcClients: rpcClients,
@ -2816,6 +2818,39 @@ func (h *Hub) GetStats() map[string]interface{} {
return result
}
func (h *Hub) GetServerInfoDialout() (result []BackendServerInfoDialout) {
h.mu.RLock()
defer h.mu.RUnlock()
for session := range h.dialoutSessions {
dialout := BackendServerInfoDialout{
SessionId: session.PublicId(),
}
if client := session.GetClient(); client != nil && client.IsConnected() {
dialout.Connected = true
dialout.Address = client.RemoteAddr()
if ua := client.UserAgent(); ua != "" {
dialout.UserAgent = ua
// Extract version from user-agent, expects "software/version".
if pos := strings.IndexByte(ua, '/'); pos != -1 {
version := ua[pos+1:]
if pos = strings.IndexByte(version, ' '); pos != -1 {
version = version[:pos]
}
dialout.Version = version
}
}
dialout.Features = session.GetFeatures()
}
result = append(result, dialout)
}
slices.SortFunc(result, func(a, b BackendServerInfoDialout) int {
return strings.Compare(a.SessionId, b.SessionId)
})
return
}
func GetRealUserIP(r *http.Request, trusted *AllowedIps) string {
addr := r.RemoteAddr
if host, _, err := net.SplitHostPort(addr); err == nil {

View file

@ -128,6 +128,7 @@ type Mcu interface {
SetOnDisconnected(func())
GetStats() interface{}
GetServerInfoSfu() *BackendServerInfoSfu
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error)

View file

@ -421,6 +421,38 @@ func (m *mcuJanus) Info() *InfoMsg {
return m.info.Load()
}
func (m *mcuJanus) GetServerInfoSfu() *BackendServerInfoSfu {
janus := &BackendServerInfoSfuJanus{
Url: m.url,
}
if m.IsConnected() {
janus.Connected = true
if info := m.Info(); info != nil {
janus.Name = info.Name
janus.Version = info.VersionString
janus.Author = info.Author
janus.DataChannels = makePtr(info.DataChannels)
janus.FullTrickle = makePtr(info.FullTrickle)
janus.LocalIP = info.LocalIP
janus.IPv6 = makePtr(info.IPv6)
if plugin, found := info.Plugins[pluginVideoRoom]; found {
janus.VideoRoom = &BackendServerInfoVideoRoom{
Name: plugin.Name,
Version: plugin.VersionString,
Author: plugin.Author,
}
}
}
}
sfu := &BackendServerInfoSfu{
Mode: SfuModeJanus,
Janus: janus,
}
return sfu
}
func (m *mcuJanus) Reload(config *goconf.ConfigFile) {
m.settings.Reload(config)
}

View file

@ -1670,6 +1670,44 @@ func (m *mcuProxy) GetStats() interface{} {
return result
}
func (m *mcuProxy) GetServerInfoSfu() *BackendServerInfoSfu {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
sfu := &BackendServerInfoSfu{
Mode: SfuModeProxy,
}
for _, c := range m.connections {
proxy := BackendServerInfoSfuProxy{
Url: c.rawUrl,
Temporary: c.IsTemporary(),
}
if len(c.ip) > 0 {
proxy.IP = c.ip.String()
}
if c.IsConnected() {
proxy.Connected = true
proxy.Shutdown = makePtr(c.IsShutdownScheduled())
proxy.Uptime = &c.connectedSince
proxy.Version = c.Version()
proxy.Features = c.Features()
proxy.Country = c.Country()
proxy.Load = makePtr(c.Load())
proxy.Bandwidth = c.Bandwidth()
}
sfu.Proxies = append(sfu.Proxies, proxy)
}
slices.SortFunc(sfu.Proxies, func(a, b BackendServerInfoSfuProxy) int {
c := strings.Compare(a.Url, b.Url)
if c == 0 {
c = strings.Compare(a.IP, b.IP)
}
return c
})
return sfu
}
func (m *mcuProxy) getContinentsMap() map[string][]string {
continentsMap := m.continentsMap.Load()
if continentsMap == nil {

View file

@ -70,6 +70,10 @@ func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) GetServerInfoSfu() *BackendServerInfoSfu {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
var maxBitrate int
if streamType == StreamTypeScreen {

View file

@ -379,6 +379,10 @@ func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) GetServerInfoSfu() *signaling.BackendServerInfoSfu {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
return nil, errors.New("not implemented")
}