Move processing of deferred method to dedicated DeferredExecutor.

This fixes an issue where re-joining a room after error stopped the
internal processing of deferred functions.
This commit is contained in:
Joachim Bauch 2020-04-22 18:13:30 +02:00 committed by Joachim Bauch
parent 563658bf59
commit 4ef06867dd
Failed to extract signature

View file

@ -371,7 +371,7 @@ type mcuJanusClient struct {
handle *JanusHandle
handleId uint64
closeChan chan bool
deferred chan func()
deferred *DeferredExecutor
handleEvent func(event *janus.EventMsg)
handleHangup func(event *janus.HangupMsg)
@ -389,6 +389,7 @@ func (c *mcuJanusClient) StreamType() string {
}
func (c *mcuJanusClient) Close(ctx context.Context) {
c.deferred.Close()
}
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
@ -429,8 +430,6 @@ loop:
default:
log.Println("Received unsupported event type", msg, reflect.TypeOf(msg))
}
case f := <-c.deferred:
f()
case <-closeChan:
break loop
}
@ -593,7 +592,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
handle: handle,
handleId: handle.Id,
closeChan: make(chan bool, 1),
deferred: make(chan func(), 64),
deferred: NewDeferredExecutor(64),
},
id: id,
}
@ -712,19 +711,19 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
jsep_msg := data.Payload
switch data.Type {
case "offer":
p.deferred <- func() {
p.deferred.Execute(func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendOffer(msgctx, jsep_msg, callback)
}
})
case "candidate":
p.deferred <- func() {
p.deferred.Execute(func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
}
})
case "endOfCandidates":
// Ignore
default:
@ -885,7 +884,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
handle: handle,
handleId: handle.Id,
closeChan: make(chan bool, 1),
deferred: make(chan func(), 64),
deferred: NewDeferredExecutor(64),
},
publisher: publisher,
}
@ -1069,26 +1068,26 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
case "requestoffer":
fallthrough
case "sendoffer":
p.deferred <- func() {
p.deferred.Execute(func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.joinRoom(msgctx, callback)
}
})
case "answer":
p.deferred <- func() {
p.deferred.Execute(func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendAnswer(msgctx, jsep_msg, callback)
}
})
case "candidate":
p.deferred <- func() {
p.deferred.Execute(func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
}
})
case "endOfCandidates":
// Ignore
default: