From e86baba54d60f3b7a172f8bbf7e2e110c519e4a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Wed, 13 Apr 2022 03:05:50 +0200 Subject: [PATCH] Discard unneeded messages based on their "sid" parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a message includes a "sid" parameter and it does not match the one from the publisher or the subscriber the message is ignored. However, if the message does not include a "sid" parameter it is processed just like before for backwards compatibility. Signed-off-by: Daniel Calviño Sánchez --- api_proxy.go | 1 + mcu_janus.go | 22 +++++++++++++++++++--- mcu_proxy.go | 2 ++ proxy/proxy_server.go | 2 ++ 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/api_proxy.go b/api_proxy.go index 364a3ae..7192410 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -216,6 +216,7 @@ type PayloadProxyClientMessage struct { Type string `json:"type"` ClientId string `json:"clientId"` + Sid string `json:"sid,omitempty"` Payload map[string]interface{} `json:"payload,omitempty"` } diff --git a/mcu_janus.go b/mcu_janus.go index 87c4b9e..fae34e9 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -951,6 +951,8 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() + // TODO Tear down previous publisher and get a new one if sid does + // not match? p.sendOffer(msgctx, jsep_msg, callback) } case "candidate": @@ -958,7 +960,11 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil) + } } case "endOfCandidates": // Ignore @@ -1250,6 +1256,8 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() + // TODO Only join the room if there is no sid or it does not match + // the subscriber sid; otherwise configure/update the subscriber. p.joinRoom(msgctx, callback) } case "answer": @@ -1257,14 +1265,22 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendAnswer(msgctx, jsep_msg, callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendAnswer(msgctx, jsep_msg, callback) + } else { + go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } } case "candidate": p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } } case "endOfCandidates": // Ignore diff --git a/mcu_proxy.go b/mcu_proxy.go index 9a8b0a0..3c9baad 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -185,6 +185,7 @@ func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageCli Payload: &PayloadProxyClientMessage{ Type: data.Type, ClientId: p.proxyId, + Sid: data.Sid, Payload: data.Payload, }, } @@ -261,6 +262,7 @@ func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageCl Payload: &PayloadProxyClientMessage{ Type: data.Type, ClientId: s.proxyId, + Sid: data.Sid, Payload: data.Payload, }, } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 9f914ce..fc919cc 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -789,6 +789,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s case "candidate": mcuData = &signaling.MessageClientMessageData{ Type: payload.Type, + Sid: payload.Sid, Payload: payload.Payload, } case "endOfCandidates": @@ -807,6 +808,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s case "sendoffer": mcuData = &signaling.MessageClientMessageData{ Type: payload.Type, + Sid: payload.Sid, } default: session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload))