diff --git a/api_proxy.go b/api_proxy.go index 8ffd3c9..364a3ae 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -172,6 +172,7 @@ type ByeProxyServerMessage struct { type CommandProxyClientMessage struct { Type string `json:"type"` + Sid string `json:"sid,omitempty"` StreamType string `json:"streamType,omitempty"` PublisherId string `json:"publisherId,omitempty"` ClientId string `json:"clientId,omitempty"` @@ -205,7 +206,8 @@ func (m *CommandProxyClientMessage) CheckValid() error { } type CommandProxyServerMessage struct { - Id string `json:"id,omitempty"` + Id string `json:"id,omitempty"` + Sid string `json:"sid,omitempty"` } // Type "payload" @@ -254,6 +256,7 @@ type EventProxyServerMessage struct { ClientId string `json:"clientId,omitempty"` Load int64 `json:"load,omitempty"` + Sid string `json:"sid,omitempty"` } // Information on a proxy in the etcd cluster. diff --git a/api_signaling.go b/api_signaling.go index 8f7e959..a2c81b9 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -654,6 +654,7 @@ type AnswerOfferMessage struct { Type string `json:"type"` RoomType string `json:"roomType"` Payload map[string]interface{} `json:"payload"` + Sid string `json:"sid,omitempty"` Update bool `json:"update,omitempty"` } diff --git a/clientsession.go b/clientsession.go index a68d707..678d510 100644 --- a/clientsession.go +++ b/clientsession.go @@ -571,6 +571,7 @@ func (s *ClientSession) sendOffer(client McuClient, sender string, streamType st Type: "offer", RoomType: streamType, Payload: offer, + Sid: client.Sid(), Update: true, } offer_data, err := json.Marshal(offer_message) @@ -601,6 +602,7 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp Payload: map[string]interface{}{ "candidate": candidate, }, + Sid: client.Sid(), } candidate_data, err := json.Marshal(candidate_message) if err != nil { @@ -698,6 +700,9 @@ func (s *ClientSession) OnIceCompleted(client McuClient) { // s.OnIceCandidate(client, nil) } +func (s *ClientSession) SubscriberSidUpdated(subscriber McuSubscriber) { +} + func (s *ClientSession) PublisherClosed(publisher McuPublisher) { s.mu.Lock() defer s.mu.Unlock() @@ -876,7 +881,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea } } var err error - publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, mediaTypes, client) + publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client) s.mu.Lock() if err != nil { return nil, err diff --git a/hub.go b/hub.go index 577463b..3781a88 100644 --- a/hub.go +++ b/hub.go @@ -1841,11 +1841,11 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes return } - h.sendMcuMessageResponse(session, message, data, response) + h.sendMcuMessageResponse(session, mc, message, data, response) }) } -func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { +func (h *Hub) sendMcuMessageResponse(session *ClientSession, mcuClient McuClient, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { var response_message *ServerMessage switch response["type"] { case "answer": @@ -1855,6 +1855,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli Type: "answer", RoomType: data.RoomType, Payload: response, + Sid: mcuClient.Sid(), } answer_data, err := json.Marshal(answer_message) if err != nil { @@ -1879,6 +1880,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli Type: "offer", RoomType: data.RoomType, Payload: response, + Sid: mcuClient.Sid(), } offer_data, err := json.Marshal(offer_message) if err != nil { diff --git a/mcu_common.go b/mcu_common.go index 8959b42..9824443 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -55,6 +55,8 @@ type McuListener interface { OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) + SubscriberSidUpdated(subscriber McuSubscriber) + PublisherClosed(publisher McuPublisher) SubscriberClosed(subscriber McuSubscriber) } @@ -73,12 +75,13 @@ type Mcu interface { GetStats() interface{} - NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) + NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) } type McuClient interface { Id() string + Sid() string StreamType() string Close(ctx context.Context) diff --git a/mcu_janus.go b/mcu_janus.go index 3b7d345..87c4b9e 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -436,6 +436,7 @@ type mcuJanusClient struct { id uint64 session uint64 roomId uint64 + sid string streamType string handle *JanusHandle @@ -455,6 +456,10 @@ func (c *mcuJanusClient) Id() string { return strconv.FormatUint(c.id, 10) } +func (c *mcuJanusClient) Sid() string { + return c.sid +} + func (c *mcuJanusClient) StreamType() string { return c.streamType } @@ -781,7 +786,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st return handle, response.Session, roomId, nil } -func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { if _, found := streamTypeUserIds[streamType]; !found { return nil, fmt.Errorf("Unsupported stream type %s", streamType) } @@ -799,6 +804,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st id: atomic.AddUint64(&m.clientId, 1), session: session, roomId: roomId, + sid: sid, streamType: streamType, handle: handle, @@ -1032,6 +1038,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ id: atomic.AddUint64(&m.clientId, 1), roomId: pub.roomId, + sid: strconv.FormatUint(handle.Id, 10), streamType: streamType, handle: handle, @@ -1123,6 +1130,8 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) } @@ -1191,6 +1200,8 @@ retry: p.handle = handle p.handleId = handle.Id p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) p.closeChan = make(chan bool, 1) go p.run(p.handle, p.closeChan) log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) diff --git a/mcu_proxy.go b/mcu_proxy.go index 38995ba..9a8b0a0 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -75,6 +75,7 @@ const ( ) type mcuProxyPubSubCommon struct { + sid string streamType string proxyId string conn *mcuProxyConnection @@ -85,6 +86,10 @@ func (c *mcuProxyPubSubCommon) Id() string { return c.proxyId } +func (c *mcuProxyPubSubCommon) Sid() string { + return c.sid +} + func (c *mcuProxyPubSubCommon) StreamType() string { return c.streamType } @@ -127,9 +132,10 @@ type mcuProxyPublisher struct { mediaTypes MediaType } -func newMcuProxyPublisher(id string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { +func newMcuProxyPublisher(id string, sid string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { return &mcuProxyPublisher{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + sid: sid, streamType: streamType, proxyId: proxyId, conn: conn, @@ -207,9 +213,10 @@ type mcuProxySubscriber struct { publisherId string } -func newMcuProxySubscriber(publisherId string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { +func newMcuProxySubscriber(publisherId string, sid string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { return &mcuProxySubscriber{ mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + sid: sid, streamType: streamType, proxyId: proxyId, conn: conn, @@ -269,6 +276,9 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) { switch msg.Type { case "ice-completed": s.listener.OnIceCompleted(s) + case "subscriber-sid-updated": + s.sid = msg.Sid + s.listener.SubscriberSidUpdated(s) case "subscriber-closed": s.NotifyClosed() default: @@ -966,11 +976,12 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC } } -func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) { +func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) { msg := &ProxyClientMessage{ Type: "command", Command: &CommandProxyClientMessage{ Type: "create-publisher", + Sid: sid, StreamType: streamType, Bitrate: bitrate, MediaTypes: mediaTypes, @@ -985,7 +996,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe proxyId := response.Command.Id log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id) - publisher := newMcuProxyPublisher(id, streamType, mediaTypes, proxyId, c, listener) + publisher := newMcuProxyPublisher(id, sid, streamType, mediaTypes, proxyId, c, listener) c.publishersLock.Lock() c.publishers[proxyId] = publisher c.publisherIds[id+"|"+streamType] = proxyId @@ -1020,7 +1031,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList proxyId := response.Command.Id log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher) - subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener) + subscriber := newMcuProxySubscriber(publisher, response.Command.Sid, streamType, proxyId, c, listener) c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber c.subscribersLock.Unlock() @@ -1916,7 +1927,7 @@ func (m *mcuProxy) removeWaiter(id uint64) { delete(m.publisherWaiters, id) } -func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { connections := m.getSortedConnections(initiator) for _, conn := range connections { if conn.IsShutdownScheduled() { @@ -1937,7 +1948,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st } else { bitrate = min(bitrate, maxBitrate) } - publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate, mediaTypes) + publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes) if err != nil { log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err) continue diff --git a/mcu_test.go b/mcu_test.go index 061ec6e..4172ed8 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -67,7 +67,7 @@ func (m *TestMCU) GetStats() interface{} { return nil } -func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { +func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { var maxBitrate int if streamType == streamTypeScreen { maxBitrate = TestMaxBitrateScreen @@ -82,6 +82,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str pub := &TestMCUPublisher{ TestMCUClient: TestMCUClient{ id: id, + sid: sid, streamType: streamType, }, @@ -122,6 +123,7 @@ type TestMCUClient struct { closed int32 id string + sid string streamType string } @@ -129,6 +131,10 @@ func (c *TestMCUClient) Id() string { return c.id } +func (c *TestMCUClient) Sid() string { + return c.sid +} + func (c *TestMCUClient) StreamType() string { return c.streamType } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 6868779..9f914ce 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -638,7 +638,7 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s } id := uuid.New().String() - publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{}) + publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.Sid, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{}) if err == context.DeadlineExceeded { log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId()) session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher)) @@ -685,7 +685,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s Id: message.Id, Type: "command", Command: &signaling.CommandProxyServerMessage{ - Id: id, + Id: id, + Sid: subscriber.Sid(), }, } session.sendMessage(response) diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 1e22352..4aec7fa 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -192,6 +192,24 @@ func (s *ProxySession) OnIceCompleted(client signaling.McuClient) { s.sendMessage(msg) } +func (s *ProxySession) SubscriberSidUpdated(subscriber signaling.McuSubscriber) { + id := s.proxy.GetClientId(subscriber) + if id == "" { + log.Printf("Received subscriber sid updated event from unknown %s subscriber %s (%+v)", subscriber.StreamType(), subscriber.Id(), subscriber) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "subscriber-sid-updated", + ClientId: id, + Sid: subscriber.Sid(), + }, + } + s.sendMessage(msg) +} + func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { if id := s.DeletePublisher(publisher); id != "" { if s.proxy.DeleteClient(id, publisher) {