diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 4e4f38a..2f33f67 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -435,7 +435,7 @@ type mcuJanusClient struct { handle *JanusHandle handleId uint64 closeChan chan bool - deferred chan func() + deferred *DeferredExecutor handleEvent func(event *janus.EventMsg) handleHangup func(event *janus.HangupMsg) @@ -453,6 +453,7 @@ func (c *mcuJanusClient) StreamType() string { } func (c *mcuJanusClient) Close(ctx context.Context) { + c.deferred.Close() } func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -493,8 +494,6 @@ loop: default: log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) } - case f := <-c.deferred: - f() case <-closeChan: break loop } @@ -657,7 +656,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: make(chan func(), 64), + deferred: NewDeferredExecutor(64), }, id: id, } @@ -776,19 +775,19 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli jsep_msg := data.Payload switch data.Type { case "offer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendOffer(msgctx, jsep_msg, callback) - } + }) case "candidate": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } + }) case "endOfCandidates": // Ignore default: @@ -949,7 +948,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ handle: handle, handleId: handle.Id, closeChan: make(chan bool, 1), - deferred: make(chan func(), 64), + deferred: NewDeferredExecutor(64), }, publisher: publisher, } @@ -1078,6 +1077,10 @@ retry: var roomId uint64 handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) if err != nil { + // Reconnection didn't work, need to unregister/remove subscriber + // so a new object will be created if the request is retried. + p.mcu.unregisterClient(p) + p.listener.SubscriberClosed(p) callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil) return } @@ -1133,26 +1136,26 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl case "requestoffer": fallthrough case "sendoffer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.joinRoom(msgctx, callback) - } + }) case "answer": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendAnswer(msgctx, jsep_msg, callback) - } + }) case "candidate": - p.deferred <- func() { + p.deferred.Execute(func() { msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } + }) case "endOfCandidates": // Ignore default: