Revert "Move processing of deferred method to dedicated DeferredExecutor."

This commit is contained in:
Joachim Bauch 2020-08-13 14:40:06 +02:00 committed by GitHub
parent 238b355e79
commit dc1bf2bc9b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -435,7 +435,7 @@ type mcuJanusClient struct {
handle *JanusHandle handle *JanusHandle
handleId uint64 handleId uint64
closeChan chan bool closeChan chan bool
deferred *DeferredExecutor deferred chan func()
handleEvent func(event *janus.EventMsg) handleEvent func(event *janus.EventMsg)
handleHangup func(event *janus.HangupMsg) handleHangup func(event *janus.HangupMsg)
@ -453,7 +453,6 @@ func (c *mcuJanusClient) StreamType() string {
} }
func (c *mcuJanusClient) Close(ctx context.Context) { func (c *mcuJanusClient) Close(ctx context.Context) {
c.deferred.Close()
} }
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
@ -494,6 +493,8 @@ loop:
default: default:
log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) log.Println("Received unsupported event type", msg, reflect.TypeOf(msg))
} }
case f := <-c.deferred:
f()
case <-closeChan: case <-closeChan:
break loop break loop
} }
@ -656,7 +657,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
handle: handle, handle: handle,
handleId: handle.Id, handleId: handle.Id,
closeChan: make(chan bool, 1), closeChan: make(chan bool, 1),
deferred: NewDeferredExecutor(64), deferred: make(chan func(), 64),
}, },
id: id, id: id,
} }
@ -775,19 +776,19 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
jsep_msg := data.Payload jsep_msg := data.Payload
switch data.Type { switch data.Type {
case "offer": case "offer":
p.deferred.Execute(func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendOffer(msgctx, jsep_msg, callback) p.sendOffer(msgctx, jsep_msg, callback)
}) }
case "candidate": case "candidate":
p.deferred.Execute(func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback) p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
}) }
case "endOfCandidates": case "endOfCandidates":
// Ignore // Ignore
default: default:
@ -948,7 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
handle: handle, handle: handle,
handleId: handle.Id, handleId: handle.Id,
closeChan: make(chan bool, 1), closeChan: make(chan bool, 1),
deferred: NewDeferredExecutor(64), deferred: make(chan func(), 64),
}, },
publisher: publisher, publisher: publisher,
} }
@ -1077,10 +1078,6 @@ retry:
var roomId uint64 var roomId uint64
handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil { if err != nil {
// Reconnection didn't work, need to unregister/remove subscriber
// so a new object will be created if the request is retried.
p.mcu.unregisterClient(p)
p.listener.SubscriberClosed(p)
callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil) callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil)
return return
} }
@ -1136,26 +1133,26 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
case "requestoffer": case "requestoffer":
fallthrough fallthrough
case "sendoffer": case "sendoffer":
p.deferred.Execute(func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.joinRoom(msgctx, callback) p.joinRoom(msgctx, callback)
}) }
case "answer": case "answer":
p.deferred.Execute(func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendAnswer(msgctx, jsep_msg, callback) p.sendAnswer(msgctx, jsep_msg, callback)
}) }
case "candidate": case "candidate":
p.deferred.Execute(func() { p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel() defer cancel()
p.sendCandidate(msgctx, jsep_msg["candidate"], callback) p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
}) }
case "endOfCandidates": case "endOfCandidates":
// Ignore // Ignore
default: default: