From cdbc1771792e8555d2c269e26507c73c9954cbef Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 30 Aug 2022 11:46:45 +0200 Subject: [PATCH 1/3] Schedule reconnect asynchronously if ping could not be sent. --- mcu_proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcu_proxy.go b/mcu_proxy.go index 99f0010..388a47a 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -488,7 +488,7 @@ func (c *mcuProxyConnection) sendPing() bool { c.conn.SetWriteDeadline(now.Add(writeWait)) // nolint if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil { log.Printf("Could not send ping to proxy at %s: %v", c, err) - c.scheduleReconnect() + go c.scheduleReconnect() return false } From 960cb0ea3cbfd946c8a23e79b38f261405ae0689 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 30 Aug 2022 11:47:38 +0200 Subject: [PATCH 2/3] Use "defer" to re-acquire released lock. --- clientsession.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientsession.go b/clientsession.go index d6ac3b4..22814b4 100644 --- a/clientsession.go +++ b/clientsession.go @@ -839,6 +839,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea if !found { client := s.getClientUnlocked() s.mu.Unlock() + defer s.mu.Lock() bitrate := data.Bitrate if backend := s.Backend(); backend != nil { @@ -856,7 +857,6 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea } var err error publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client) - s.mu.Lock() if err != nil { return nil, err } From 9a868c6d9189f3d41ef94598c2f22fd3267efb10 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 30 Aug 2022 11:49:08 +0200 Subject: [PATCH 3/3] Handle error responses when creating proxy publisher / subscriber. --- mcu_proxy.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mcu_proxy.go b/mcu_proxy.go index 388a47a..ac4da48 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -1042,6 +1042,8 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe if err != nil { // TODO: Cancel request return nil, err + } else if response.Type == "error" { + return nil, fmt.Errorf("Error creating %s publisher for %s on %s: %+v", streamType, id, c, response.Error) } proxyId := response.Command.Id @@ -1070,6 +1072,8 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList if err != nil { // TODO: Cancel request return nil, err + } else if response.Type == "error" { + return nil, fmt.Errorf("Error creating %s subscriber for %s on %s: %+v", streamType, publisherSessionId, c, response.Error) } proxyId := response.Command.Id