Discard unneeded messages based on their "sid" parameter

If a message includes a "sid" parameter and it does not match the one
from the publisher or the subscriber the message is ignored. However, if
the message does not include a "sid" parameter it is processed just like
before for backwards compatibility.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2022-04-13 03:05:50 +02:00
parent 27989752bf
commit e86baba54d
4 changed files with 24 additions and 3 deletions

View File

@ -216,6 +216,7 @@ type PayloadProxyClientMessage struct {
Type string `json:"type"` Type string `json:"type"`
ClientId string `json:"clientId"` ClientId string `json:"clientId"`
Sid string `json:"sid,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"` Payload map[string]interface{} `json:"payload,omitempty"`
} }

View File

@ -951,6 +951,8 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
// TODO Tear down previous publisher and get a new one if sid does
// not match?
p.sendOffer(msgctx, jsep_msg, callback) p.sendOffer(msgctx, jsep_msg, callback)
} }
case "candidate": case "candidate":
@ -958,7 +960,11 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback) if data.Sid == "" || data.Sid == p.Sid() {
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
} else {
go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil)
}
} }
case "endOfCandidates": case "endOfCandidates":
// Ignore // Ignore
@ -1250,6 +1256,8 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
// TODO Only join the room if there is no sid or it does not match
// the subscriber sid; otherwise configure/update the subscriber.
p.joinRoom(msgctx, callback) p.joinRoom(msgctx, callback)
} }
case "answer": case "answer":
@ -1257,14 +1265,22 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendAnswer(msgctx, jsep_msg, callback) if data.Sid == "" || data.Sid == p.Sid() {
p.sendAnswer(msgctx, jsep_msg, callback)
} else {
go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
}
} }
case "candidate": case "candidate":
p.deferred <- func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback) if data.Sid == "" || data.Sid == p.Sid() {
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
} else {
go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
}
} }
case "endOfCandidates": case "endOfCandidates":
// Ignore // Ignore

View File

@ -185,6 +185,7 @@ func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageCli
Payload: &PayloadProxyClientMessage{ Payload: &PayloadProxyClientMessage{
Type: data.Type, Type: data.Type,
ClientId: p.proxyId, ClientId: p.proxyId,
Sid: data.Sid,
Payload: data.Payload, Payload: data.Payload,
}, },
} }
@ -261,6 +262,7 @@ func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageCl
Payload: &PayloadProxyClientMessage{ Payload: &PayloadProxyClientMessage{
Type: data.Type, Type: data.Type,
ClientId: s.proxyId, ClientId: s.proxyId,
Sid: data.Sid,
Payload: data.Payload, Payload: data.Payload,
}, },
} }

View File

@ -789,6 +789,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
case "candidate": case "candidate":
mcuData = &signaling.MessageClientMessageData{ mcuData = &signaling.MessageClientMessageData{
Type: payload.Type, Type: payload.Type,
Sid: payload.Sid,
Payload: payload.Payload, Payload: payload.Payload,
} }
case "endOfCandidates": case "endOfCandidates":
@ -807,6 +808,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
case "sendoffer": case "sendoffer":
mcuData = &signaling.MessageClientMessageData{ mcuData = &signaling.MessageClientMessageData{
Type: payload.Type, Type: payload.Type,
Sid: payload.Sid,
} }
default: default:
session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload)) session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload))