Merge pull request #229 from danxuliu/add-specific-id-for-connections-and-replace-update-parameter-with-it

Add specific id for connections and replace "update" parameter with it
This commit is contained in:
Joachim Bauch 2022-04-27 16:17:17 +02:00 committed by GitHub
commit 93746a4d9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 101 additions and 21 deletions

View file

@ -172,6 +172,7 @@ type ByeProxyServerMessage struct {
type CommandProxyClientMessage struct {
Type string `json:"type"`
Sid string `json:"sid,omitempty"`
StreamType string `json:"streamType,omitempty"`
PublisherId string `json:"publisherId,omitempty"`
ClientId string `json:"clientId,omitempty"`
@ -205,7 +206,8 @@ func (m *CommandProxyClientMessage) CheckValid() error {
}
type CommandProxyServerMessage struct {
Id string `json:"id,omitempty"`
Id string `json:"id,omitempty"`
Sid string `json:"sid,omitempty"`
}
// Type "payload"
@ -214,6 +216,7 @@ type PayloadProxyClientMessage struct {
Type string `json:"type"`
ClientId string `json:"clientId"`
Sid string `json:"sid,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
}
@ -254,6 +257,7 @@ type EventProxyServerMessage struct {
ClientId string `json:"clientId,omitempty"`
Load int64 `json:"load,omitempty"`
Sid string `json:"sid,omitempty"`
}
// Information on a proxy in the etcd cluster.

View file

@ -654,7 +654,7 @@ type AnswerOfferMessage struct {
Type string `json:"type"`
RoomType string `json:"roomType"`
Payload map[string]interface{} `json:"payload"`
Update bool `json:"update,omitempty"`
Sid string `json:"sid,omitempty"`
}
// Type "transient"

View file

@ -571,7 +571,7 @@ func (s *ClientSession) sendOffer(client McuClient, sender string, streamType st
Type: "offer",
RoomType: streamType,
Payload: offer,
Update: true,
Sid: client.Sid(),
}
offer_data, err := json.Marshal(offer_message)
if err != nil {
@ -601,6 +601,7 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp
Payload: map[string]interface{}{
"candidate": candidate,
},
Sid: client.Sid(),
}
candidate_data, err := json.Marshal(candidate_message)
if err != nil {
@ -698,6 +699,9 @@ func (s *ClientSession) OnIceCompleted(client McuClient) {
// s.OnIceCandidate(client, nil)
}
func (s *ClientSession) SubscriberSidUpdated(subscriber McuSubscriber) {
}
func (s *ClientSession) PublisherClosed(publisher McuPublisher) {
s.mu.Lock()
defer s.mu.Unlock()
@ -876,7 +880,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
}
}
var err error
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, mediaTypes, client)
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client)
s.mu.Lock()
if err != nil {
return nil, err

6
hub.go
View file

@ -1841,11 +1841,11 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
return
}
h.sendMcuMessageResponse(session, message, data, response)
h.sendMcuMessageResponse(session, mc, message, data, response)
})
}
func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) {
func (h *Hub) sendMcuMessageResponse(session *ClientSession, mcuClient McuClient, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) {
var response_message *ServerMessage
switch response["type"] {
case "answer":
@ -1855,6 +1855,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli
Type: "answer",
RoomType: data.RoomType,
Payload: response,
Sid: mcuClient.Sid(),
}
answer_data, err := json.Marshal(answer_message)
if err != nil {
@ -1879,6 +1880,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli
Type: "offer",
RoomType: data.RoomType,
Payload: response,
Sid: mcuClient.Sid(),
}
offer_data, err := json.Marshal(offer_message)
if err != nil {

View file

@ -55,6 +55,8 @@ type McuListener interface {
OnIceCandidate(client McuClient, candidate interface{})
OnIceCompleted(client McuClient)
SubscriberSidUpdated(subscriber McuSubscriber)
PublisherClosed(publisher McuPublisher)
SubscriberClosed(subscriber McuSubscriber)
}
@ -73,12 +75,13 @@ type Mcu interface {
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error)
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
}
type McuClient interface {
Id() string
Sid() string
StreamType() string
Close(ctx context.Context)

View file

@ -436,6 +436,7 @@ type mcuJanusClient struct {
id uint64
session uint64
roomId uint64
sid string
streamType string
handle *JanusHandle
@ -455,6 +456,10 @@ func (c *mcuJanusClient) Id() string {
return strconv.FormatUint(c.id, 10)
}
func (c *mcuJanusClient) Sid() string {
return c.sid
}
func (c *mcuJanusClient) StreamType() string {
return c.streamType
}
@ -781,7 +786,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
return handle, response.Session, roomId, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
if _, found := streamTypeUserIds[streamType]; !found {
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
@ -799,6 +804,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
id: atomic.AddUint64(&m.clientId, 1),
session: session,
roomId: roomId,
sid: sid,
streamType: streamType,
handle: handle,
@ -945,6 +951,8 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
// TODO Tear down previous publisher and get a new one if sid does
// not match?
p.sendOffer(msgctx, jsep_msg, callback)
}
case "candidate":
@ -952,7 +960,11 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
if data.Sid == "" || data.Sid == p.Sid() {
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
} else {
go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil)
}
}
case "endOfCandidates":
// Ignore
@ -1032,6 +1044,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
id: atomic.AddUint64(&m.clientId, 1),
roomId: pub.roomId,
sid: strconv.FormatUint(handle.Id, 10),
streamType: streamType,
handle: handle,
@ -1123,6 +1136,8 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle
p.handleId = handle.Id
p.roomId = pub.roomId
p.sid = strconv.FormatUint(handle.Id, 10)
p.listener.SubscriberSidUpdated(p)
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
}
@ -1191,6 +1206,8 @@ retry:
p.handle = handle
p.handleId = handle.Id
p.roomId = pub.roomId
p.sid = strconv.FormatUint(handle.Id, 10)
p.listener.SubscriberSidUpdated(p)
p.closeChan = make(chan bool, 1)
go p.run(p.handle, p.closeChan)
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
@ -1239,6 +1256,8 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
// TODO Only join the room if there is no sid or it does not match
// the subscriber sid; otherwise configure/update the subscriber.
p.joinRoom(msgctx, callback)
}
case "answer":
@ -1246,14 +1265,22 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendAnswer(msgctx, jsep_msg, callback)
if data.Sid == "" || data.Sid == p.Sid() {
p.sendAnswer(msgctx, jsep_msg, callback)
} else {
go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
}
}
case "candidate":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
if data.Sid == "" || data.Sid == p.Sid() {
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
} else {
go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
}
}
case "endOfCandidates":
// Ignore

View file

@ -75,6 +75,7 @@ const (
)
type mcuProxyPubSubCommon struct {
sid string
streamType string
proxyId string
conn *mcuProxyConnection
@ -85,6 +86,10 @@ func (c *mcuProxyPubSubCommon) Id() string {
return c.proxyId
}
func (c *mcuProxyPubSubCommon) Sid() string {
return c.sid
}
func (c *mcuProxyPubSubCommon) StreamType() string {
return c.streamType
}
@ -127,9 +132,10 @@ type mcuProxyPublisher struct {
mediaTypes MediaType
}
func newMcuProxyPublisher(id string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
func newMcuProxyPublisher(id string, sid string, streamType string, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
return &mcuProxyPublisher{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
streamType: streamType,
proxyId: proxyId,
conn: conn,
@ -179,6 +185,7 @@ func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageCli
Payload: &PayloadProxyClientMessage{
Type: data.Type,
ClientId: p.proxyId,
Sid: data.Sid,
Payload: data.Payload,
},
}
@ -207,9 +214,10 @@ type mcuProxySubscriber struct {
publisherId string
}
func newMcuProxySubscriber(publisherId string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber {
func newMcuProxySubscriber(publisherId string, sid string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber {
return &mcuProxySubscriber{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
streamType: streamType,
proxyId: proxyId,
conn: conn,
@ -254,6 +262,7 @@ func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageCl
Payload: &PayloadProxyClientMessage{
Type: data.Type,
ClientId: s.proxyId,
Sid: data.Sid,
Payload: data.Payload,
},
}
@ -269,6 +278,9 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) {
switch msg.Type {
case "ice-completed":
s.listener.OnIceCompleted(s)
case "subscriber-sid-updated":
s.sid = msg.Sid
s.listener.SubscriberSidUpdated(s)
case "subscriber-closed":
s.NotifyClosed()
default:
@ -966,11 +978,12 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC
}
}
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) {
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType) (McuPublisher, error) {
msg := &ProxyClientMessage{
Type: "command",
Command: &CommandProxyClientMessage{
Type: "create-publisher",
Sid: sid,
StreamType: streamType,
Bitrate: bitrate,
MediaTypes: mediaTypes,
@ -985,7 +998,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
proxyId := response.Command.Id
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
publisher := newMcuProxyPublisher(id, streamType, mediaTypes, proxyId, c, listener)
publisher := newMcuProxyPublisher(id, sid, streamType, mediaTypes, proxyId, c, listener)
c.publishersLock.Lock()
c.publishers[proxyId] = publisher
c.publisherIds[id+"|"+streamType] = proxyId
@ -1020,7 +1033,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList
proxyId := response.Command.Id
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher)
subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener)
subscriber := newMcuProxySubscriber(publisher, response.Command.Sid, streamType, proxyId, c, listener)
c.subscribersLock.Lock()
c.subscribers[proxyId] = subscriber
c.subscribersLock.Unlock()
@ -1916,7 +1929,7 @@ func (m *mcuProxy) removeWaiter(id uint64) {
delete(m.publisherWaiters, id)
}
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
for _, conn := range connections {
if conn.IsShutdownScheduled() {
@ -1937,7 +1950,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
} else {
bitrate = min(bitrate, maxBitrate)
}
publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate, mediaTypes)
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
continue

View file

@ -67,7 +67,7 @@ func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
var maxBitrate int
if streamType == streamTypeScreen {
maxBitrate = TestMaxBitrateScreen
@ -82,6 +82,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str
pub := &TestMCUPublisher{
TestMCUClient: TestMCUClient{
id: id,
sid: sid,
streamType: streamType,
},
@ -122,6 +123,7 @@ type TestMCUClient struct {
closed int32
id string
sid string
streamType string
}
@ -129,6 +131,10 @@ func (c *TestMCUClient) Id() string {
return c.id
}
func (c *TestMCUClient) Sid() string {
return c.sid
}
func (c *TestMCUClient) StreamType() string {
return c.streamType
}

View file

@ -638,7 +638,7 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
}
id := uuid.New().String()
publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{})
publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.Sid, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{})
if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher))
@ -685,7 +685,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: id,
Id: id,
Sid: subscriber.Sid(),
},
}
session.sendMessage(response)
@ -788,6 +789,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
case "candidate":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,
Sid: payload.Sid,
Payload: payload.Payload,
}
case "endOfCandidates":
@ -806,6 +808,7 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
case "sendoffer":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,
Sid: payload.Sid,
}
default:
session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload))

View file

@ -192,6 +192,24 @@ func (s *ProxySession) OnIceCompleted(client signaling.McuClient) {
s.sendMessage(msg)
}
func (s *ProxySession) SubscriberSidUpdated(subscriber signaling.McuSubscriber) {
id := s.proxy.GetClientId(subscriber)
if id == "" {
log.Printf("Received subscriber sid updated event from unknown %s subscriber %s (%+v)", subscriber.StreamType(), subscriber.Id(), subscriber)
return
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "subscriber-sid-updated",
ClientId: id,
Sid: subscriber.Sid(),
},
}
s.sendMessage(msg)
}
func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
if id := s.DeletePublisher(publisher); id != "" {
if s.proxy.DeleteClient(id, publisher) {