From 3636e69dba05ce2084d160d0ce5ba3d7a82375e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Wed, 13 Apr 2022 03:03:36 +0200 Subject: [PATCH] Add specific ID for connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A new ID is introduced for publishers and subscribers to be able to identify different connections in the same session. While it was possible to do that from the Janus handles this ID is meant to be shared with the clients and set by the peer sending the offer, so it is set by the client for publishers and by the signaling server for subscribers (in which case the Janus handle is used). This will make possible to know without any ambiguity if an offer/answer belongs to an already existing connection or not. This can be used, for example, to know if an offer represents an update or a new connection, or to discard unneeded messages that might be sent in some corner cases by the clients. The ID name is "sid" for consistency with the property name used by the internal signaling server of Talk. Signed-off-by: Daniel Calviño Sánchez --- api_proxy.go | 5 ++++- api_signaling.go | 1 + clientsession.go | 7 ++++++- hub.go | 6 ++++-- mcu_common.go | 5 ++++- mcu_janus.go | 13 ++++++++++++- mcu_proxy.go | 25 ++++++++++++++++++------- mcu_test.go | 8 +++++++- proxy/proxy_server.go | 5 +++-- proxy/proxy_session.go | 18 ++++++++++++++++++ 10 files changed, 77 insertions(+), 16 deletions(-) diff --git a/api_proxy.go b/api_proxy.go index 8ffd3c9..364a3ae 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -172,6 +172,7 @@ type ByeProxyServerMessage struct { type CommandProxyClientMessage struct { Type string `json:"type"` + Sid string `json:"sid,omitempty"` StreamType string `json:"streamType,omitempty"` PublisherId string `json:"publisherId,omitempty"` ClientId string `json:"clientId,omitempty"` @@ -205,7 +206,8 @@ func (m *CommandProxyClientMessage) CheckValid() error { } type CommandProxyServerMessage struct { - Id string `json:"id,omitempty"` + Id string `json:"id,omitempty"` + Sid string `json:"sid,omitempty"` } // Type "payload" @@ -254,6 +256,7 @@ type EventProxyServerMessage struct { ClientId string `json:"clientId,omitempty"` Load int64 `json:"load,omitempty"` + Sid string `json:"sid,omitempty"` } // Information on a proxy in the etcd cluster. diff --git a/api_signaling.go b/api_signaling.go index 8f7e959..a2c81b9 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -654,6 +654,7 @@ type AnswerOfferMessage struct { Type string `json:"type"` RoomType string `json:"roomType"` Payload map[string]interface{} `json:"payload"` + Sid string `json:"sid,omitempty"` Update bool `json:"update,omitempty"` } diff --git a/clientsession.go b/clientsession.go index a68d707..678d510 100644 --- a/clientsession.go +++ b/clientsession.go @@ -571,6 +571,7 @@ func (s *ClientSession) sendOffer(client McuClient, sender string, streamType st Type: "offer", RoomType: streamType, Payload: offer, + Sid: client.Sid(), Update: true, } offer_data, err := json.Marshal(offer_message) @@ -601,6 +602,7 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp Payload: map[string]interface{}{ "candidate": candidate, }, + Sid: client.Sid(), } candidate_data, err := json.Marshal(candidate_message) if err != nil { @@ -698,6 +700,9 @@ func (s *ClientSession) OnIceCompleted(client McuClient) { // s.OnIceCandidate(client, nil) } +func (s *ClientSession) SubscriberSidUpdated(subscriber McuSubscriber) { +} + func (s *ClientSession) PublisherClosed(publisher McuPublisher) { s.mu.Lock() defer s.mu.Unlock() @@ -876,7 +881,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea } } var err error - publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, mediaTypes, client) + publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client) s.mu.Lock() if err != nil { return nil, err diff --git a/hub.go b/hub.go index 577463b..3781a88 100644 --- a/hub.go +++ b/hub.go @@ -1841,11 +1841,11 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes return } - h.sendMcuMessageResponse(session, message, data, response) + h.sendMcuMessageResponse(session, mc, message, data, response) }) } -func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { +func (h *Hub) sendMcuMessageResponse(session *ClientSession, mcuClient McuClient, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { var response_message *ServerMessage switch response["type"] { case "answer": @@ -1855,6 +1855,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli Type: "answer", RoomType: data.RoomType, Payload: response, + Sid: mcuClient.Sid(), } answer_data, err := json.Marshal(answer_message) if err != nil { @@ -1879,6 +1880,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli Type: "offer", RoomType: data.RoomType, Payload: response, + Sid: mcuClient.Sid(), } offer_data, err := json.Marshal(offer_message) if err != nil { diff --git a/mcu_common.go b/mcu_common.go index 8959b42..9824443 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -55,6 +55,8 @@ type McuListener interface { OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) + SubscriberSidUpdated(subscriber McuSubscriber) + PublisherClosed(publisher McuPublisher) SubscriberClosed(subscriber McuSubscriber) } @@ -73,12 +75,13 @@ type Mcu interface { GetStats() interface{} - NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) + NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) } type McuClient interface { Id() string + Sid() string StreamType() string Close(ctx context.Context) diff --git a/mcu_janus.go b/mcu_janus.go index 3b7d345..87c4b9e 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -436,6 +436,7 @@ type mcuJanusClient struct { id uint64 session uint64 roomId uint64 + sid string streamType string handle *JanusHandle @@ -455,6 +456,10 @@ func (c *mcuJanusClient) Id() string { return strconv.FormatUint(c.id, 10) } +func (c *mcuJanusClient) Sid() string { + return c.sid +} + func (c *mcuJanusClient) StreamType() string { return c.streamType } @@ -781,7 +786,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st return handle, response.Session, roomId, nil } -func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { if _, found := streamTypeUserIds[streamType]; !found { return nil, fmt.Errorf("Unsupported stream type %s", streamType) } @@ -799,6 +804,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st id: atomic.AddUint64(&m.clientId, 1), session: session, roomId: roomId, + sid: sid, streamType: streamType, handle: handle, @@ -1032,6 +1038,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ id: atomic.AddUint64(&m.clientId, 1), roomId: pub.roomId, + sid: strconv.FormatUint(handle.Id, 10), streamType: streamType, handle: handle, @@ -1123,6 +1130,8 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) } @@ -1191,6 +1200,8 @@ retry: p.handle = handle p.handleId = handle.Id p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) p.closeChan = make(chan bool, 1) go p.run(p.handle, p.closeChan) log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) diff --git a/mcu_proxy.go b/mcu_proxy.go index 38995ba..9a8b0a0 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -75,6 +75,7 @@ const ( ) type mcuProxyPubSubCommon struct { + sid string streamType string proxyId string conn *mcuProxyConnection @@ -85,6 +86,10 @@ func (c *mcuProxyPubSubCommon) Id() string { return c.proxyId } +func (c *mcuProxyPubSubCommon) Sid() string { + return c.sid +} + func (c *mcuProxyPubSubCommon) StreamType() string { return c.streamType } @@ -127,9 +132,10 @@ type mcuProxyPublisher struct { mediaTypes MediaType } -func newMcuProxyPublisher(id string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { +func newMcuProxyPublisher(id string, sid string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { return &mcuProxyPublisher{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + sid: sid, streamType: streamType, proxyId: proxyId, conn: conn, @@ -207,9 +213,10 @@ type mcuProxySubscriber struct { publisherId string } -func newMcuProxySubscriber(publisherId string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { +func newMcuProxySubscriber(publisherId string, sid string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { return &mcuProxySubscriber{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + sid: sid, streamType: streamType, proxyId: proxyId, conn: conn, @@ -269,6 +276,9 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) { switch msg.Type { case "ice-completed": s.listener.OnIceCompleted(s) + case "subscriber-sid-updated": + s.sid = msg.Sid + s.listener.SubscriberSidUpdated(s) case "subscriber-closed": s.NotifyClosed() default: @@ -966,11 +976,12 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC } } -func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) { +func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) { msg := &ProxyClientMessage{ Type: "command", Command: &CommandProxyClientMessage{ Type: "create-publisher", + Sid: sid, StreamType: streamType, Bitrate: bitrate, MediaTypes: mediaTypes, @@ -985,7 +996,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe proxyId := response.Command.Id log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id) - publisher := newMcuProxyPublisher(id, streamType, mediaTypes, proxyId, c, listener) + publisher := newMcuProxyPublisher(id, sid, streamType, mediaTypes, proxyId, c, listener) c.publishersLock.Lock() c.publishers[proxyId] = publisher c.publisherIds[id+"|"+streamType] = proxyId @@ -1020,7 +1031,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList proxyId := response.Command.Id log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher) - subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener) + subscriber := newMcuProxySubscriber(publisher, response.Command.Sid, streamType, proxyId, c, listener) c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber c.subscribersLock.Unlock() @@ -1916,7 +1927,7 @@ func (m *mcuProxy) removeWaiter(id uint64) { delete(m.publisherWaiters, id) } -func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { connections := m.getSortedConnections(initiator) for _, conn := range connections { if conn.IsShutdownScheduled() { @@ -1937,7 +1948,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st } else { bitrate = min(bitrate, maxBitrate) } - publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate, mediaTypes) + publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes) if err != nil { log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err) continue diff --git a/mcu_test.go b/mcu_test.go index 061ec6e..4172ed8 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -67,7 +67,7 @@ func (m *TestMCU) GetStats() interface{} { return nil } -func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { var maxBitrate int if streamType == streamTypeScreen { maxBitrate = TestMaxBitrateScreen @@ -82,6 +82,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str pub := &TestMCUPublisher{ TestMCUClient: TestMCUClient{ id: id, + sid: sid, streamType: streamType, }, @@ -122,6 +123,7 @@ type TestMCUClient struct { closed int32 id string + sid string streamType string } @@ -129,6 +131,10 @@ func (c *TestMCUClient) Id() string { return c.id } +func (c *TestMCUClient) Sid() string { + return c.sid +} + func (c *TestMCUClient) StreamType() string { return c.streamType } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 6868779..9f914ce 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -638,7 +638,7 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s } id := uuid.New().String() - publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{}) + publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.Sid, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{}) if err == context.DeadlineExceeded { log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId()) session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher)) @@ -685,7 +685,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s Id: message.Id, Type: "command", Command: &signaling.CommandProxyServerMessage{ - Id: id, + Id: id, + Sid: subscriber.Sid(), }, } session.sendMessage(response) diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 1e22352..4aec7fa 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -192,6 +192,24 @@ func (s *ProxySession) OnIceCompleted(client signaling.McuClient) { s.sendMessage(msg) } +func (s *ProxySession) SubscriberSidUpdated(subscriber signaling.McuSubscriber) { + id := s.proxy.GetClientId(subscriber) + if id == "" { + log.Printf("Received subscriber sid updated event from unknown %s subscriber %s (%+v)", subscriber.StreamType(), subscriber.Id(), subscriber) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "subscriber-sid-updated", + ClientId: id, + Sid: subscriber.Sid(), + }, + } + s.sendMessage(msg) +} + func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { if id := s.DeletePublisher(publisher); id != "" { if s.proxy.DeleteClient(id, publisher) {