From 4ef06867ddc905aeac1d1953d2671a3ffea00018 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 22 Apr 2020 18:13:30 +0200 Subject: [PATCH] Move processing of deferred method to dedicated DeferredExecutor. This fixes an issue where re-joining a room after error stopped the internal processing of deferred functions. --- src/signaling/mcu_janus.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 78db934..08ea2af 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -371,7 +371,7 @@ type mcuJanusClient struct { handle *JanusHandle handleId uint64 closeChan chan bool - deferred chan func() + deferred *DeferredExecutor handleEvent func(event *janus.EventMsg) handleHangup func(event *janus.HangupMsg) @@ -389,6 +389,7 @@ func (c *mcuJanusClient) StreamType() string { } func (c *mcuJanusClient) Close(ctx context.Context) { + c.deferred.Close() } func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -429,8 +430,6 @@ loop: default: log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) } - case f := <-c.deferred: - f() case <-closeChan: break loop } @@ -593,7 +592,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: make(chan func(), 64), + deferred: NewDeferredExecutor(64), }, id: id, } @@ -712,19 +711,19 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli jsep_msg := data.Payload switch data.Type { case "offer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendOffer(msgctx, jsep_msg, callback) - } + }) case "candidate": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } + }) case "endOfCandidates": // Ignore default: @@ -885,7 +884,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: make(chan func(), 64), + deferred: NewDeferredExecutor(64), }, publisher: publisher, } @@ -1069,26 +1068,26 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl case "requestoffer": fallthrough case "sendoffer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.joinRoom(msgctx, callback) - } + }) case "answer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendAnswer(msgctx, jsep_msg, callback) - } + }) case "candidate": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } + }) case "endOfCandidates": // Ignore default: