Reload list of proxy URLs on SIGHUP.

This commit is contained in:
Joachim Bauch 2020-08-31 13:07:03 +02:00
parent eeb3d356af
commit 116188c3e3
Failed to extract signature
6 changed files with 205 additions and 32 deletions

View File

@ -34,6 +34,7 @@ import (
"runtime"
runtimepprof "runtime/pprof"
"strings"
"syscall"
"time"
"github.com/dlintw/goconf"
@ -47,7 +48,7 @@ import (
var (
version = "unreleased"
config = flag.String("config", "server.conf", "config file to use")
configFlag = flag.String("config", "server.conf", "config file to use")
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
@ -98,8 +99,9 @@ func main() {
os.Exit(0)
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, syscall.SIGHUP)
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
@ -127,7 +129,7 @@ func main() {
log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid())
config, err := goconf.ReadConfigFile(*config)
config, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
log.Fatal("Could not read configuration: ", err)
}
@ -184,8 +186,22 @@ func main() {
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, mcuUrl, err, mcuRetry)
mcuRetryTimer.Reset(mcuRetry)
select {
case <-interrupt:
log.Fatalf("Cancelled")
case sig := <-sigChan:
switch sig {
case os.Interrupt:
log.Fatalf("Cancelled")
case syscall.SIGHUP:
log.Printf("Received SIGHUP, reloading %s", *configFlag)
if config, err = goconf.ReadConfigFile(*configFlag); err != nil {
log.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
mcuUrl, _ = config.GetString("mcu", "url")
mcuType, _ = config.GetString("mcu", "type")
if mcuType == "" {
mcuType = signaling.McuTypeDefault
}
}
}
case <-mcuRetryTimer.C:
// Retry connection
mcuRetry = mcuRetry * 2
@ -290,6 +306,22 @@ func main() {
}
}
<-interrupt
log.Println("Interrupted")
loop:
for {
select {
case sig := <-sigChan:
switch sig {
case os.Interrupt:
log.Println("Interrupted")
break loop
case syscall.SIGHUP:
log.Printf("Received SIGHUP, reloading %s", *configFlag)
if config, err := goconf.ReadConfigFile(*configFlag); err != nil {
log.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
hub.Reload(config)
}
}
}
}
}

View File

@ -371,6 +371,12 @@ func (h *Hub) Stop() {
}
}
func (h *Hub) Reload(config *goconf.ConfigFile) {
if h.mcu != nil {
h.mcu.Reload(config)
}
}
func reverseSessionId(s string) (string, error) {
// Note that we are assuming base64 encoded strings here.
decoded, err := base64.URLEncoding.DecodeString(s)

View File

@ -24,6 +24,8 @@ package signaling
import (
"fmt"
"github.com/dlintw/goconf"
"golang.org/x/net/context"
)
@ -55,6 +57,7 @@ type McuInitiator interface {
type Mcu interface {
Start() error
Stop()
Reload(config *goconf.ConfigFile)
SetOnConnected(func())
SetOnDisconnected(func())

View File

@ -359,6 +359,9 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop()
}
func (m *mcuJanus) Reload(config *goconf.ConfigFile) {
}
func (m *mcuJanus) SetOnConnected(f func()) {
if f == nil {
f = emptyOnConnected

View File

@ -242,8 +242,9 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) {
}
type mcuProxyConnection struct {
proxy *mcuProxy
url *url.URL
proxy *mcuProxy
rawUrl string
url *url.URL
mu sync.Mutex
closeChan chan bool
@ -255,6 +256,7 @@ type mcuProxyConnection struct {
reconnectInterval int64
reconnectTimer *time.Timer
shutdownScheduled uint32
closeScheduled uint32
msgId int64
helloMsgId string
@ -280,6 +282,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection
conn := &mcuProxyConnection{
proxy: proxy,
rawUrl: baseUrl,
url: parsed,
closeChan: make(chan bool, 1),
closedChan: make(chan bool, 1),
@ -331,7 +334,7 @@ func (c *mcuProxyConnection) Country() string {
}
func (c *mcuProxyConnection) IsShutdownScheduled() bool {
return atomic.LoadUint32(&c.shutdownScheduled) != 0
return atomic.LoadUint32(&c.shutdownScheduled) != 0 || atomic.LoadUint32(&c.closeScheduled) != 0
}
func (c *mcuProxyConnection) readPump() {
@ -433,6 +436,38 @@ func (c *mcuProxyConnection) close() {
}
}
func (c *mcuProxyConnection) stopCloseIfEmpty() {
atomic.StoreUint32(&c.closeScheduled, 0)
}
func (c *mcuProxyConnection) closeIfEmpty() bool {
atomic.StoreUint32(&c.closeScheduled, 1)
var total int64
c.publishersLock.RLock()
total += int64(len(c.publishers))
c.publishersLock.RUnlock()
c.subscribersLock.RLock()
total += int64(len(c.subscribers))
c.subscribersLock.RUnlock()
if total > 0 {
// Connection will be closed once all clients have disconnected.
log.Printf("Connection to %s is still used by %d clients, defer closing", c.url, total)
return false
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
defer cancel()
log.Printf("All clients disconnected, closing connection to %s", c.url)
c.stop(ctx)
c.proxy.removeConnection(c)
}()
return true
}
func (c *mcuProxyConnection) scheduleReconnect() {
if err := c.sendClose(); err != nil && err != ErrNotConnected {
log.Printf("Could not send close message to %s: %s", c.url, err)
@ -496,6 +531,10 @@ func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) {
delete(c.publishers, publisher.proxyId)
delete(c.publisherIds, publisher.id+"|"+publisher.StreamType())
if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
go c.closeIfEmpty()
}
}
func (c *mcuProxyConnection) clearPublishers() {
@ -509,6 +548,10 @@ func (c *mcuProxyConnection) clearPublishers() {
}(c.publishers)
c.publishers = make(map[string]*mcuProxyPublisher)
c.publisherIds = make(map[string]string)
if atomic.LoadUint32(&c.closeScheduled) != 0 {
go c.closeIfEmpty()
}
}
func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) {
@ -516,6 +559,10 @@ func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) {
defer c.subscribersLock.Unlock()
delete(c.subscribers, subscriber.proxyId)
if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
go c.closeIfEmpty()
}
}
func (c *mcuProxyConnection) clearSubscribers() {
@ -528,6 +575,10 @@ func (c *mcuProxyConnection) clearSubscribers() {
}
}(c.subscribers)
c.subscribers = make(map[string]*mcuProxySubscriber)
if atomic.LoadUint32(&c.closeScheduled) != 0 {
go c.closeIfEmpty()
}
}
func (c *mcuProxyConnection) clearCallbacks() {
@ -821,9 +872,11 @@ type mcuProxy struct {
tokenId string
tokenKey *rsa.PrivateKey
connections atomic.Value
connRequests int64
nextSort int64
connections []*mcuProxyConnection
connectionsMap map[string]*mcuProxyConnection
connectionsMu sync.RWMutex
connRequests int64
nextSort int64
mu sync.RWMutex
publishers map[string]*mcuProxyConnection
@ -833,8 +886,6 @@ type mcuProxy struct {
}
func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
var connections []*mcuProxyConnection
tokenId, _ := config.GetString("mcu", "token_id")
if tokenId == "" {
return nil, fmt.Errorf("No token id configured")
@ -856,6 +907,8 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
tokenId: tokenId,
tokenKey: tokenKey,
connectionsMap: make(map[string]*mcuProxyConnection),
publishers: make(map[string]*mcuProxyConnection),
publisherWaiters: make(map[uint64]chan bool),
@ -867,26 +920,21 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
return nil, err
}
connections = append(connections, conn)
mcu.connections = append(mcu.connections, conn)
mcu.connectionsMap[u] = conn
}
if len(connections) == 0 {
if len(mcu.connections) == 0 {
return nil, fmt.Errorf("No MCU proxy connections configured")
}
mcu.setConnections(connections)
return mcu, nil
}
func (m *mcuProxy) setConnections(connections []*mcuProxyConnection) {
m.connections.Store(connections)
}
func (m *mcuProxy) getConnections() []*mcuProxyConnection {
return m.connections.Load().([]*mcuProxyConnection)
}
func (m *mcuProxy) Start() error {
for _, c := range m.getConnections() {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
for _, c := range m.connections {
if err := c.start(); err != nil {
return err
}
@ -895,13 +943,81 @@ func (m *mcuProxy) Start() error {
}
func (m *mcuProxy) Stop() {
for _, c := range m.getConnections() {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
for _, c := range m.connections {
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
defer cancel()
c.stop(ctx)
}
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
remove := make(map[string]*mcuProxyConnection)
for u, conn := range m.connectionsMap {
remove[u] = conn
}
created := make(map[string]*mcuProxyConnection)
changed := false
mcuUrl, _ := config.GetString("mcu", "url")
for _, u := range strings.Split(mcuUrl, " ") {
if existing, found := remove[u]; found {
// Proxy connection still exists in new configuration
delete(remove, u)
existing.stopCloseIfEmpty()
continue
}
conn, err := newMcuProxyConnection(m, u)
if err != nil {
log.Printf("Could not create proxy connection to %s: %s", u, err)
continue
}
created[u] = conn
}
for _, conn := range remove {
go conn.closeIfEmpty()
}
for u, conn := range created {
if err := conn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", u, err)
continue
}
log.Printf("Adding new connection to %s", u)
m.connections = append(m.connections, conn)
m.connectionsMap[u] = conn
changed = true
}
if changed {
atomic.StoreInt64(&m.nextSort, 0)
}
}
func (m *mcuProxy) removeConnection(c *mcuProxyConnection) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
if _, found := m.connectionsMap[c.rawUrl]; found {
delete(m.connectionsMap, c.rawUrl)
m.connections = nil
for _, conn := range m.connectionsMap {
m.connections = append(m.connections, conn)
}
atomic.StoreInt64(&m.nextSort, 0)
}
}
func (m *mcuProxy) SetOnConnected(f func()) {
// Not supported.
}
@ -921,7 +1037,11 @@ func (m *mcuProxy) GetStats() interface{} {
result := &mcuProxyStats{
Details: details,
}
for _, conn := range m.getConnections() {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
for _, conn := range m.connections {
stats := conn.GetStats()
result.Publishers += stats.Publishers
result.Clients += stats.Clients
@ -998,7 +1118,9 @@ func sortConnectionsForCountry(connections []*mcuProxyConnection, country string
}
func (m *mcuProxy) getSortedConnections(initiator McuInitiator) []*mcuProxyConnection {
connections := m.getConnections()
m.connectionsMu.RLock()
connections := m.connections
m.connectionsMu.RUnlock()
if len(connections) < 2 {
return connections
}
@ -1014,7 +1136,9 @@ func (m *mcuProxy) getSortedConnections(initiator McuInitiator) []*mcuProxyConne
sorted.Sort()
m.setConnections(sorted)
m.connectionsMu.Lock()
m.connections = sorted
m.connectionsMu.Unlock()
connections = sorted
}

View File

@ -24,6 +24,8 @@ package signaling
import (
"fmt"
"github.com/dlintw/goconf"
"golang.org/x/net/context"
)
@ -41,6 +43,9 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() {
}
func (m *TestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *TestMCU) SetOnConnected(f func()) {
}