diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go index 1ec191c..3fecb15 100644 --- a/src/signaling/mcu_proxy.go +++ b/src/signaling/mcu_proxy.go @@ -28,6 +28,7 @@ import ( "fmt" "io/ioutil" "log" + "net/http" "net/url" "sort" "strconv" @@ -67,6 +68,13 @@ const ( defaultProxyTimeoutSeconds = 2 ) +var ( + websocketDialer = &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + } +) + type mcuProxyPubSubCommon struct { streamType string proxyId string @@ -373,7 +381,22 @@ func (c *mcuProxyConnection) readPump() { conn := c.conn c.mu.Unlock() + conn.SetPongHandler(func(msg string) error { + now := time.Now() + conn.SetReadDeadline(now.Add(pongWait)) + if msg == "" { + return nil + } + if ts, err := strconv.ParseInt(msg, 10, 64); err == nil { + rtt := now.Sub(time.Unix(0, ts)) + rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds() + log.Printf("Proxy at %s has RTT of %d ms (%s)", c.url, rtt_ms, rtt) + } + return nil + }) + for { + conn.SetReadDeadline(time.Now().Add(pongWait)) _, message, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, @@ -395,12 +418,38 @@ func (c *mcuProxyConnection) readPump() { } } +func (c *mcuProxyConnection) sendPing() bool { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn == nil { + return false + } + + now := time.Now() + msg := strconv.FormatInt(now.UnixNano(), 10) + c.conn.SetWriteDeadline(now.Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil { + log.Printf("Could not send ping to proxy at %s: %v", c.url, err) + c.scheduleReconnect() + return false + } + + return true +} + func (c *mcuProxyConnection) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + }() + c.reconnectTimer = time.NewTimer(0) for { select { case <-c.reconnectTimer.C: c.reconnect() + case <-ticker.C: + c.sendPing() case <-c.closeChan: return } @@ -420,6 +469,7 @@ func (c *mcuProxyConnection) sendClose() error { return ErrNotConnected } + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) return c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) } @@ -518,7 +568,7 @@ func (c *mcuProxyConnection) reconnect() { u.Scheme = "wss" } - conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + conn, _, err := websocketDialer.Dial(u.String(), nil) if err != nil { log.Printf("Could not connect to %s: %s", u, err) c.scheduleReconnect() @@ -541,6 +591,10 @@ func (c *mcuProxyConnection) reconnect() { return } + if !c.sendPing() { + return + } + go c.readPump() } @@ -808,6 +862,7 @@ func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error { if c.conn == nil { return ErrNotConnected } + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) return c.conn.WriteJSON(msg) }