From dc1bf2bc9b0d5548ecfaa76620f9d157d0cb3e79 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Aug 2020 14:40:06 +0200 Subject: [PATCH] Revert "Move processing of deferred method to dedicated DeferredExecutor." --- src/signaling/mcu_janus.go | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 2f33f67..4e4f38a 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -435,7 +435,7 @@ type mcuJanusClient struct { handle *JanusHandle handleId uint64 closeChan chan bool - deferred *DeferredExecutor + deferred chan func() handleEvent func(event *janus.EventMsg) handleHangup func(event *janus.HangupMsg) @@ -453,7 +453,6 @@ func (c *mcuJanusClient) StreamType() string { } func (c *mcuJanusClient) Close(ctx context.Context) { - c.deferred.Close() } func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -494,6 +493,8 @@ loop: default: log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) } + case f := <-c.deferred: + f() case <-closeChan: break loop } @@ -656,7 +657,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: NewDeferredExecutor(64), + deferred: make(chan func(), 64), }, id: id, } @@ -775,19 +776,19 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli jsep_msg := data.Payload switch data.Type { case "offer": - p.deferred.Execute(func() { + p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendOffer(msgctx, jsep_msg, callback) - }) + } case "candidate": - p.deferred.Execute(func() { + p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - }) + } case "endOfCandidates": // Ignore default: @@ -948,7 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: NewDeferredExecutor(64), + deferred: make(chan func(), 64), }, publisher: publisher, } @@ -1077,10 +1078,6 @@ retry: var roomId uint64 handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) if err != nil { - // Reconnection didn't work, need to unregister/remove subscriber - // so a new object will be created if the request is retried. - p.mcu.unregisterClient(p) - p.listener.SubscriberClosed(p) callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil) return } @@ -1136,26 +1133,26 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl case "requestoffer": fallthrough case "sendoffer": - p.deferred.Execute(func() { + p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.joinRoom(msgctx, callback) - }) + } case "answer": - p.deferred.Execute(func() { + p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendAnswer(msgctx, jsep_msg, callback) - }) + } case "candidate": - p.deferred.Execute(func() { + p.deferred <- func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - }) + } case "endOfCandidates": // Ignore default: