diff --git a/api_proxy.go b/api_proxy.go index 364a3ae..7192410 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -216,6 +216,7 @@ type PayloadProxyClientMessage struct { Type string `json:"type"` ClientId string `json:"clientId"` + Sid string `json:"sid,omitempty"` Payload map[string]interface{} `json:"payload,omitempty"` } diff --git a/mcu_janus.go b/mcu_janus.go index 87c4b9e..fae34e9 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -951,6 +951,8 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() + // TODO Tear down previous publisher and get a new one if sid does + // not match? p.sendOffer(msgctx, jsep_msg, callback) } case "candidate": @@ -958,7 +960,11 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil) + } } case "endOfCandidates": // Ignore @@ -1250,6 +1256,8 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() + // TODO Only join the room if there is no sid or it does not match + // the subscriber sid; otherwise configure/update the subscriber. p.joinRoom(msgctx, callback) } case "answer": @@ -1257,14 +1265,22 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendAnswer(msgctx, jsep_msg, callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendAnswer(msgctx, jsep_msg, callback) + } else { + go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } } case "candidate": p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } } case "endOfCandidates": // Ignore diff --git a/mcu_proxy.go b/mcu_proxy.go index 9a8b0a0..3c9baad 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -185,6 +185,7 @@ func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageCli Payload: &PayloadProxyClientMessage{ Type: data.Type, ClientId: p.proxyId, + Sid: data.Sid, Payload: data.Payload, }, } @@ -261,6 +262,7 @@ func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageCl Payload: &PayloadProxyClientMessage{ Type: data.Type, ClientId: s.proxyId, + Sid: data.Sid, Payload: data.Payload, }, } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 9f914ce..fc919cc 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -789,6 +789,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s case "candidate": mcuData = &signaling.MessageClientMessageData{ Type: payload.Type, + Sid: payload.Sid, Payload: payload.Payload, } case "endOfCandidates": @@ -807,6 +808,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s case "sendoffer": mcuData = &signaling.MessageClientMessageData{ Type: payload.Type, + Sid: payload.Sid, } default: session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload))