Split to-device events into separate channel in AS handler

This commit is contained in:
Tulir Asokan 2022-11-09 16:35:09 +02:00
commit 0296adbd0b
4 changed files with 35 additions and 12 deletions

View file

@ -96,11 +96,12 @@ type AppService struct {
txnIDC *TransactionIDCache
Events chan *event.Event `yaml:"-"`
DeviceLists chan *mautrix.DeviceLists `yaml:"-"`
OTKCounts chan *mautrix.OTKCount `yaml:"-"`
QueryHandler QueryHandler `yaml:"-"`
StateStore StateStore `yaml:"-"`
Events chan *event.Event `yaml:"-"`
ToDeviceEvents chan *event.Event `yaml:"-"`
DeviceLists chan *mautrix.DeviceLists `yaml:"-"`
OTKCounts chan *mautrix.OTKCount `yaml:"-"`
QueryHandler QueryHandler `yaml:"-"`
StateStore StateStore `yaml:"-"`
Router *mux.Router `yaml:"-"`
UserAgent string `yaml:"-"`
@ -275,6 +276,7 @@ func (as *AppService) BotClient() *mautrix.Client {
// Init initializes the logger and loads the registration of this appservice.
func (as *AppService) Init() (bool, error) {
as.Events = make(chan *event.Event, EventChannelSize)
as.ToDeviceEvents = make(chan *event.Event, EventChannelSize)
as.OTKCounts = make(chan *mautrix.OTKCount, OTKChannelSize)
as.DeviceLists = make(chan *mautrix.DeviceLists, EventChannelSize)
as.QueryHandler = &QueryHandlerStub{}

View file

@ -137,12 +137,22 @@ func (ep *EventProcessor) Dispatch(evt *event.Event) {
}
}
}
func (ep *EventProcessor) Start() {
func (ep *EventProcessor) startEvents() {
for {
select {
case evt := <-ep.as.Events:
ep.Dispatch(evt)
case <-ep.stop:
return
}
}
}
func (ep *EventProcessor) startEncryption() {
for {
select {
case evt := <-ep.as.ToDeviceEvents:
ep.Dispatch(evt)
case otk := <-ep.as.OTKCounts:
ep.DispatchOTK(otk)
case dl := <-ep.as.DeviceLists:
@ -153,6 +163,11 @@ func (ep *EventProcessor) Start() {
}
}
func (ep *EventProcessor) Stop() {
ep.stop <- struct{}{}
func (ep *EventProcessor) Start() {
go ep.startEvents()
go ep.startEncryption()
}
func (ep *EventProcessor) Stop() {
close(ep.stop)
}

View file

@ -206,13 +206,19 @@ func (as *AppService) handleEvents(evts []*event.Event, defaultTypeClass event.T
}
if evt.Type.IsState() {
// TODO remove this check after https://github.com/matrix-org/synapse/pull/11265
// TODO remove this check after making sure the log doesn't happen
historical, ok := evt.Content.Raw["org.matrix.msc2716.historical"].(bool)
if !ok || !historical {
if ok && historical {
as.Log.Warnfln("Received historical state event %s (%s/%s)", evt.ID, evt.Type.Type, evt.GetStateKey())
} else {
as.UpdateState(evt)
}
}
as.Events <- evt
if evt.Type.Class == event.ToDeviceEventType {
as.ToDeviceEvents <- evt
} else {
as.Events <- evt
}
}
}

View file

@ -481,7 +481,7 @@ func (br *Bridge) start() {
br.Log.Debugln("Starting application service HTTP server")
go br.AS.Start()
br.Log.Debugln("Starting event processor")
go br.EventProcessor.Start()
br.EventProcessor.Start()
go br.UpdateBotProfile()
if br.Crypto != nil {