diff --git a/api_proxy.go b/api_proxy.go index 62f5197..ccf953a 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -215,6 +215,8 @@ func (m *CommandProxyClientMessage) CheckValid() error { type CommandProxyServerMessage struct { Id string `json:"id,omitempty"` Sid string `json:"sid,omitempty"` + + Bitrate int `json:"bitrate,omitempty"` } // Type "payload" diff --git a/mcu_common.go b/mcu_common.go index 6fe48c0..3bea933 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -104,6 +104,7 @@ type McuClient interface { Id() string Sid() string StreamType() StreamType + MaxBitrate() int Close(ctx context.Context) diff --git a/mcu_janus.go b/mcu_janus.go index afe247b..58d92e1 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -438,6 +438,7 @@ type mcuJanusClient struct { roomId uint64 sid string streamType StreamType + maxBitrate int handle *JanusHandle handleId uint64 @@ -464,6 +465,10 @@ func (c *mcuJanusClient) StreamType() StreamType { return c.streamType } +func (c *mcuJanusClient) MaxBitrate() int { + return c.maxBitrate +} + func (c *mcuJanusClient) Close(ctx context.Context) { } @@ -715,14 +720,14 @@ func min(a, b int) int { return b } -func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, error) { +func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, int, error) { session := m.session if session == nil { - return nil, 0, 0, ErrNotConnected + return nil, 0, 0, 0, ErrNotConnected } handle, err := session.Attach(ctx, pluginVideoRoom) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id) @@ -752,7 +757,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err2 := handle.Detach(ctx); err2 != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err2) } - return nil, 0, 0, err + return nil, 0, 0, 0, err } roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room") @@ -760,7 +765,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err := handle.Detach(ctx); err != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err) } - return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response) + return nil, 0, 0, 0, fmt.Errorf("No room id received: %+v", create_response) } log.Println("Created room", roomId, create_response.PluginData) @@ -777,10 +782,10 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err2 := handle.Detach(ctx); err2 != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err2) } - return nil, 0, 0, err + return nil, 0, 0, 0, err } - return handle, response.Session, roomId, nil + return handle, response.Session, roomId, bitrate, nil } func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { @@ -788,7 +793,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st return nil, fmt.Errorf("Unsupported stream type %s", streamType) } - handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate) + handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate) if err != nil { return nil, err } @@ -803,6 +808,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st roomId: roomId, sid: sid, streamType: streamType, + maxBitrate: maxBitrate, handle: handle, handleId: handle.Id, @@ -892,7 +898,7 @@ func (p *mcuJanusPublisher) SetMedia(mt MediaType) { func (p *mcuJanusPublisher) NotifyReconnected() { ctx := context.TODO() - handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) + handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) if err != nil { log.Printf("Could not reconnect publisher %s: %s", p.id, err) // TODO(jojo): Retry @@ -1043,6 +1049,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ roomId: pub.roomId, sid: strconv.FormatUint(handle.Id, 10), streamType: streamType, + maxBitrate: pub.MaxBitrate(), handle: handle, handleId: handle.Id, diff --git a/mcu_proxy.go b/mcu_proxy.go index eeff1de..131fa98 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -77,6 +77,7 @@ type McuProxy interface { type mcuProxyPubSubCommon struct { sid string streamType StreamType + maxBitrate int proxyId string conn *mcuProxyConnection listener McuListener @@ -94,6 +95,10 @@ func (c *mcuProxyPubSubCommon) StreamType() StreamType { return c.streamType } +func (c *mcuProxyPubSubCommon) MaxBitrate() int { + return c.maxBitrate +} + func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClientMessage, callback func(error, map[string]interface{})) { c.conn.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) { if err != nil { @@ -132,11 +137,12 @@ type mcuProxyPublisher struct { mediaTypes MediaType } -func newMcuProxyPublisher(id string, sid string, streamType StreamType, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { +func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { return &mcuProxyPublisher{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ sid: sid, streamType: streamType, + maxBitrate: maxBitrate, proxyId: proxyId, conn: conn, listener: listener, @@ -217,11 +223,12 @@ type mcuProxySubscriber struct { publisherId string } -func newMcuProxySubscriber(publisherId string, sid string, streamType StreamType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { +func newMcuProxySubscriber(publisherId string, sid string, streamType StreamType, maxBitrate int, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { return &mcuProxySubscriber{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ sid: sid, streamType: streamType, + maxBitrate: maxBitrate, proxyId: proxyId, conn: conn, listener: listener, @@ -1054,7 +1061,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe proxyId := response.Command.Id log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id) - publisher := newMcuProxyPublisher(id, sid, streamType, mediaTypes, proxyId, c, listener) + publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, mediaTypes, proxyId, c, listener) c.publishersLock.Lock() c.publishers[proxyId] = publisher c.publisherIds[getStreamId(id, streamType)] = proxyId @@ -1084,7 +1091,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList proxyId := response.Command.Id log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisherSessionId) - subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, proxyId, c, listener) + subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, response.Command.Bitrate, proxyId, c, listener) c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber c.subscribersLock.Unlock() diff --git a/mcu_test.go b/mcu_test.go index 42de651..52100e5 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -158,6 +158,10 @@ func (c *TestMCUClient) StreamType() StreamType { return c.streamType } +func (c *TestMCUClient) MaxBitrate() int { + return 0 +} + func (c *TestMCUClient) Close(ctx context.Context) { if c.closed.CompareAndSwap(false, true) { log.Printf("Close MCU client %s", c.id) diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index f950b1a..decd89e 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -305,9 +305,7 @@ loop: } func (s *ProxyServer) updateLoad() { - // TODO: Take maximum bandwidth of clients into account when calculating - // load (screensharing requires more than regular audio/video). - load := s.GetClientCount() + load := s.GetClientsLoad() if load == s.load.Load() { return } @@ -390,7 +388,7 @@ func (s *ProxyServer) ScheduleShutdown() { session.sendMessage(msg) }) - if s.GetClientCount() == 0 { + if !s.HasClients() { go close(s.shutdownChannel) } } @@ -653,7 +651,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s Id: message.Id, Type: "command", Command: &signaling.CommandProxyServerMessage{ - Id: id, + Id: id, + Bitrate: int(publisher.MaxBitrate()), }, } session.sendMessage(response) @@ -978,10 +977,21 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool { return true } -func (s *ProxyServer) GetClientCount() int64 { +func (s *ProxyServer) HasClients() bool { s.clientsLock.RLock() defer s.clientsLock.RUnlock() - return int64(len(s.clients)) + return len(s.clients) > 0 +} + +func (s *ProxyServer) GetClientsLoad() int64 { + s.clientsLock.RLock() + defer s.clientsLock.RUnlock() + + var load int64 + for _, c := range s.clients { + load += int64(c.MaxBitrate()) + } + return load / 1024 } func (s *ProxyServer) GetClient(id string) signaling.McuClient {