Send ping to proxy regularly to detect broken connection.

This commit is contained in:
Joachim Bauch 2020-12-11 15:58:13 +01:00
parent 823be4ed43
commit d2b7fafde5
Failed to extract signature

View file

@ -28,6 +28,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sort"
"strconv"
@ -67,6 +68,13 @@ const (
defaultProxyTimeoutSeconds = 2
)
var (
websocketDialer = &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}
)
type mcuProxyPubSubCommon struct {
streamType string
proxyId string
@ -373,7 +381,22 @@ func (c *mcuProxyConnection) readPump() {
conn := c.conn
c.mu.Unlock()
conn.SetPongHandler(func(msg string) error {
now := time.Now()
conn.SetReadDeadline(now.Add(pongWait))
if msg == "" {
return nil
}
if ts, err := strconv.ParseInt(msg, 10, 64); err == nil {
rtt := now.Sub(time.Unix(0, ts))
rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds()
log.Printf("Proxy at %s has RTT of %d ms (%s)", c.url, rtt_ms, rtt)
}
return nil
})
for {
conn.SetReadDeadline(time.Now().Add(pongWait))
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
@ -395,12 +418,38 @@ func (c *mcuProxyConnection) readPump() {
}
}
func (c *mcuProxyConnection) sendPing() bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return false
}
now := time.Now()
msg := strconv.FormatInt(now.UnixNano(), 10)
c.conn.SetWriteDeadline(now.Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
log.Printf("Could not send ping to proxy at %s: %v", c.url, err)
c.scheduleReconnect()
return false
}
return true
}
func (c *mcuProxyConnection) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
c.reconnectTimer = time.NewTimer(0)
for {
select {
case <-c.reconnectTimer.C:
c.reconnect()
case <-ticker.C:
c.sendPing()
case <-c.closeChan:
return
}
@ -420,6 +469,7 @@ func (c *mcuProxyConnection) sendClose() error {
return ErrNotConnected
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
return c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}
@ -518,7 +568,7 @@ func (c *mcuProxyConnection) reconnect() {
u.Scheme = "wss"
}
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
conn, _, err := websocketDialer.Dial(u.String(), nil)
if err != nil {
log.Printf("Could not connect to %s: %s", u, err)
c.scheduleReconnect()
@ -541,6 +591,10 @@ func (c *mcuProxyConnection) reconnect() {
return
}
if !c.sendPing() {
return
}
go c.readPump()
}
@ -808,6 +862,7 @@ func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error {
if c.conn == nil {
return ErrNotConnected
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
return c.conn.WriteJSON(msg)
}