From d9d11b58e129d53cd62791d529895204489ac0c6 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Fri, 7 Aug 2020 15:16:13 +0200 Subject: [PATCH] Shutdown a proxy gracefully on SIGUSR1. No new publishers will be created by the proxy, existing publishers can still be subscribed. After all clients have disconnected, the process will terminate. --- README.md | 4 ++- src/proxy/main.go | 32 ++++++++++------- src/proxy/proxy_server.go | 70 ++++++++++++++++++++++++++++++++++++-- src/signaling/mcu_proxy.go | 21 ++++++++++-- 4 files changed, 109 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 095c2f6..785c34a 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,9 @@ for each token id. See the comments in `proxy.conf.in` for other configuration options. When the proxy process receives a `SIGHUP` signal, the list of allowed token -ids / public keys is reloaded. +ids / public keys is reloaded. A `SIGUSR1` signal can be used to shutdown a +proxy process gracefully after all clients have been disconnected. No new +publishers will be accepted in this case. ## Setup of frontend webserver diff --git a/src/proxy/main.go b/src/proxy/main.go index de3ba1a..e3505e8 100644 --- a/src/proxy/main.go +++ b/src/proxy/main.go @@ -68,6 +68,7 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt) signal.Notify(sigChan, syscall.SIGHUP) + signal.Notify(sigChan, syscall.SIGUSR1) log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid()) @@ -135,19 +136,26 @@ func main() { loop: for { - switch sig := <-sigChan; sig { - case os.Interrupt: - log.Println("Interrupted") - break loop - case syscall.SIGHUP: - log.Printf("Received SIGHUP, reloading %s", *configFlag) - config, err := goconf.ReadConfigFile(*configFlag) - if err != nil { - log.Printf("Could not read configuration from %s: %s", *configFlag, err) - continue + select { + case sig := <-sigChan: + switch sig { + case os.Interrupt: + log.Println("Interrupted") + break loop + case syscall.SIGHUP: + log.Printf("Received SIGHUP, reloading %s", *configFlag) + if config, err := goconf.ReadConfigFile(*configFlag); err != nil { + log.Printf("Could not read configuration from %s: %s", *configFlag, err) + } else { + proxy.Reload(config) + } + case syscall.SIGUSR1: + log.Printf("Received SIGUSR1, scheduling server to shutdown") + proxy.ScheduleShutdown() } - - proxy.Reload(config) + case <-proxy.ShutdownChannel(): + log.Printf("All clients disconnected, shutting down") + break loop } } } diff --git a/src/proxy/proxy_server.go b/src/proxy/proxy_server.go index a82a42a..802c916 100644 --- a/src/proxy/proxy_server.go +++ b/src/proxy/proxy_server.go @@ -80,6 +80,7 @@ var ( UnsupportedCommand = signaling.NewError("bad_request", "Unsupported command received.") UnsupportedMessage = signaling.NewError("bad_request", "Unsupported message received.") UnsupportedPayload = signaling.NewError("unsupported_payload", "Unsupported payload type.") + ShutdownScheduled = signaling.NewError("shutdown_scheduled", "The server is scheduled to shutdown.") ) type ProxyServer struct { @@ -92,6 +93,9 @@ type ProxyServer struct { stopped uint32 load int64 + shutdownChannel chan bool + shutdownScheduled uint32 + upgrader websocket.Upgrader tokenKeys atomic.Value @@ -186,6 +190,8 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, na nats: nats, + shutdownChannel: make(chan bool, 1), + upgrader: websocket.Upgrader{ ReadBufferSize: websocketReadBufferSize, WriteBufferSize: websocketWriteBufferSize, @@ -322,6 +328,11 @@ func (s *ProxyServer) updateLoad() { } atomic.StoreInt64(&s.load, load) + if atomic.LoadUint32(&s.shutdownScheduled) != 0 { + // Server is scheduled to shutdown, no need to update clients with current load. + return + } + msg := &signaling.ProxyServerMessage{ Type: "event", Event: &signaling.EventProxyServerMessage{ @@ -372,6 +383,32 @@ func (s *ProxyServer) Stop() { s.mcu.Stop() } +func (s *ProxyServer) ShutdownChannel() chan bool { + return s.shutdownChannel +} + +func (s *ProxyServer) ScheduleShutdown() { + if !atomic.CompareAndSwapUint32(&s.shutdownScheduled, 0, 1) { + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "shutdown-scheduled", + }, + } + s.IterateSessions(func(session *ProxySession) { + session.sendMessage(msg) + }) + + if s.GetClientCount() == 0 { + go func() { + s.shutdownChannel <- true + }() + } +} + func (s *ProxyServer) Reload(config *goconf.ConfigFile) { tokenKeys := make(map[string]*rsa.PublicKey) options, _ := config.GetOptions("tokens") @@ -511,6 +548,16 @@ func (s *ProxyServer) sendCurrentLoad(session *ProxySession) { session.sendMessage(msg) } +func (s *ProxyServer) sendShutdownScheduled(session *ProxySession) { + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "shutdown-scheduled", + }, + } + session.sendMessage(msg) +} + func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) { if proxyDebugMessages { log.Printf("Message: %s", string(data)) @@ -556,7 +603,11 @@ func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) { } log.Printf("Resumed session %s", session.PublicId()) - s.sendCurrentLoad(session) + if atomic.LoadUint32(&s.shutdownScheduled) != 0 { + s.sendShutdownScheduled(session) + } else { + s.sendCurrentLoad(session) + } } else { var err error if session, err = s.NewSession(message.Hello); err != nil { @@ -592,7 +643,11 @@ func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) { }, } client.SendMessage(response) - s.sendCurrentLoad(session) + if atomic.LoadUint32(&s.shutdownScheduled) != 0 { + s.sendShutdownScheduled(session) + } else { + s.sendCurrentLoad(session) + } return } @@ -613,6 +668,11 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s cmd := message.Command switch cmd.Type { case "create-publisher": + if atomic.LoadUint32(&s.shutdownScheduled) != 0 { + session.sendMessage(message.NewErrorServerMessage(ShutdownScheduled)) + return + } + id := uuid.New().String() publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType) if err == context.DeadlineExceeded { @@ -901,6 +961,12 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) { defer s.clientsLock.Unlock() delete(s.clients, id) delete(s.clientIds, client.Id()) + + if len(s.clients) == 0 && atomic.LoadUint32(&s.shutdownScheduled) != 0 { + go func() { + s.shutdownChannel <- true + }() + } } func (s *ProxyServer) GetClientCount() int64 { diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go index f5c0bbb..032d1f1 100644 --- a/src/signaling/mcu_proxy.go +++ b/src/signaling/mcu_proxy.go @@ -254,6 +254,7 @@ type mcuProxyConnection struct { connectedSince time.Time reconnectInterval int64 reconnectTimer *time.Timer + shutdownScheduled uint32 msgId int64 helloMsgId string @@ -323,6 +324,10 @@ func (c *mcuProxyConnection) Load() int64 { return atomic.LoadInt64(&c.load) } +func (c *mcuProxyConnection) IsShutdownScheduled() bool { + return atomic.LoadUint32(&c.shutdownScheduled) != 0 +} + func (c *mcuProxyConnection) readPump() { defer func() { if atomic.LoadUint32(&c.closed) == 0 { @@ -467,6 +472,7 @@ func (c *mcuProxyConnection) reconnect() { c.mu.Unlock() atomic.StoreInt64(&c.reconnectInterval, int64(initialReconnectInterval)) + atomic.StoreUint32(&c.shutdownScheduled, 0) if err := c.sendHello(); err != nil { log.Printf("Could not send hello request to %s: %s", c.url, err) c.scheduleReconnect() @@ -608,22 +614,27 @@ func (c *mcuProxyConnection) processPayload(msg *ProxyServerMessage) { func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { event := msg.Event - if event.Type == "backend-disconnected" { + switch event.Type { + case "backend-disconnected": log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c.url) c.clearPublishers() c.clearSubscribers() c.clearCallbacks() // TODO: Should we also reconnect? return - } else if event.Type == "backend-connected" { + case "backend-connected": log.Printf("Upstream backend at %s is connected", c.url) return - } else if event.Type == "update-load" { + case "update-load": if proxyDebugMessages { log.Printf("Load of %s now at %d", c.url, event.Load) } atomic.StoreInt64(&c.load, event.Load) return + case "shutdown-scheduled": + log.Printf("Proxy %s is scheduled to shutdown", c.url) + atomic.StoreUint32(&c.shutdownScheduled, 1) + return } if proxyDebugMessages { @@ -972,6 +983,10 @@ func (m *mcuProxy) removeWaiter(id uint64) { func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { connections := m.getSortedConnections() for _, conn := range connections { + if conn.IsShutdownScheduled() { + continue + } + publisher, err := conn.newPublisher(ctx, listener, id, streamType) if err != nil { log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err)