From dbd5a393e693b8081e23a2f77c6879d609d0a6be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Mon, 2 Mar 2026 15:35:16 +0100 Subject: [PATCH] Add Beeper AI stream ephemeral support Rename generic ephemeral APIs to Beeper-specific variants and add support for Beeper's AI stream ephemeral event. Changes include: renaming SendEphemeralEvent -> BeeperSendEphemeralEvent in client, appservice intent and AS intent; introducing BeeperEphemeralEventAIStream event type and BeeperFeatureAIStreamEvent feature flag; gating AI stream handling on homeserver feature support; updating connector/portal handling and network/matrix interfaces to use Beeper-prefixed action-response and AI stream types; and updating related tests. These changes separate Beeper unstable ephemeral semantics from generic ephemeral handling and ensure feature negotiation before sending AI stream events. --- appservice/intent.go | 4 ++-- bridgev2/errors.go | 2 +- bridgev2/matrix/connector.go | 2 +- bridgev2/matrix/intent.go | 4 ++-- bridgev2/matrix/matrix.go | 6 +++++- bridgev2/matrixinterface.go | 2 +- bridgev2/networkinterface.go | 19 +++++------------- bridgev2/portal.go | 39 ++++++++++-------------------------- client.go | 15 ++++++++++++-- client_ephemeral_test.go | 6 +++--- event/content.go | 7 ++++--- event/type.go | 10 ++++----- versions.go | 1 + 13 files changed, 54 insertions(+), 63 deletions(-) diff --git a/appservice/intent.go b/appservice/intent.go index eaf2ad41..a4ad7b8a 100644 --- a/appservice/intent.go +++ b/appservice/intent.go @@ -222,12 +222,12 @@ func (intent *IntentAPI) SendMessageEvent(ctx context.Context, roomID id.RoomID, return intent.Client.SendMessageEvent(ctx, roomID, eventType, contentJSON, extra...) } -func (intent *IntentAPI) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...mautrix.ReqSendEvent) (*mautrix.RespSendEvent, error) { +func (intent *IntentAPI) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...mautrix.ReqSendEvent) (*mautrix.RespSendEvent, error) { if err := intent.EnsureJoined(ctx, roomID); err != nil { return nil, err } contentJSON = intent.AddDoublePuppetValue(contentJSON) - return intent.Client.SendEphemeralEvent(ctx, roomID, eventType, contentJSON, extra...) + return intent.Client.BeeperSendEphemeralEvent(ctx, roomID, eventType, contentJSON, extra...) } // Deprecated: use SendMessageEvent with mautrix.ReqSendEvent.Timestamp instead diff --git a/bridgev2/errors.go b/bridgev2/errors.go index 63433e71..1fb81dc6 100644 --- a/bridgev2/errors.go +++ b/bridgev2/errors.go @@ -75,7 +75,7 @@ var ( ErrMediaConvertFailed error = WrapErrorInStatus(errors.New("failed to convert media")).WithMessage("failed to convert media").WithIsCertain(true).WithSendNotice(true) ErrMembershipNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support changing group membership")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported) ErrDeleteChatNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support deleting chats")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported) - ErrActionResponseNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support action responses")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported) + ErrBeeperActionResponseNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support Beeper action responses")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported) ErrPowerLevelsNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support changing group power levels")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported) ErrRemoteEchoTimeout = WrapErrorInStatus(errors.New("remote echo timed out")).WithIsCertain(false).WithSendNotice(true).WithErrorReason(event.MessageStatusTooOld) ErrRemoteAckTimeout = WrapErrorInStatus(errors.New("remote ack timed out")).WithIsCertain(false).WithSendNotice(true).WithErrorReason(event.MessageStatusTooOld) diff --git a/bridgev2/matrix/connector.go b/bridgev2/matrix/connector.go index 80dad7cc..f010abff 100644 --- a/bridgev2/matrix/connector.go +++ b/bridgev2/matrix/connector.go @@ -158,7 +158,7 @@ func (br *Connector) Init(bridge *bridgev2.Bridge) { br.EventProcessor.On(event.BeeperActionResponse, br.handleRoomEvent) br.EventProcessor.On(event.EphemeralEventReceipt, br.handleEphemeralEvent) br.EventProcessor.On(event.EphemeralEventTyping, br.handleEphemeralEvent) - br.EventProcessor.On(event.EphemeralEventAIStream, br.handleEphemeralEvent) + br.EventProcessor.On(event.BeeperEphemeralEventAIStream, br.handleEphemeralEvent) br.Bot = br.AS.BotIntent() br.Crypto = NewCryptoHelper(br) br.Bridge.Commands.(*commands.Processor).AddHandlers( diff --git a/bridgev2/matrix/intent.go b/bridgev2/matrix/intent.go index 23d4b78a..72adad96 100644 --- a/bridgev2/matrix/intent.go +++ b/bridgev2/matrix/intent.go @@ -85,7 +85,7 @@ func (as *ASIntent) SendMessage(ctx context.Context, roomID id.RoomID, eventType return as.Matrix.SendMessageEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{Timestamp: extra.Timestamp.UnixMilli()}) } -func (as *ASIntent) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) { +func (as *ASIntent) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) { if encrypted, err := as.Matrix.StateStore.IsEncrypted(ctx, roomID); err != nil { return nil, fmt.Errorf("failed to check if room is encrypted: %w", err) } else if encrypted && as.Connector.Crypto != nil { @@ -94,7 +94,7 @@ func (as *ASIntent) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, ev } eventType = event.EventEncrypted } - return as.Matrix.SendEphemeralEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{TransactionID: txnID}) + return as.Matrix.BeeperSendEphemeralEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{TransactionID: txnID}) } func (as *ASIntent) fillMemberEvent(ctx context.Context, roomID id.RoomID, userID id.UserID, content *event.Content) { diff --git a/bridgev2/matrix/matrix.go b/bridgev2/matrix/matrix.go index 804e2527..d5cd6f7c 100644 --- a/bridgev2/matrix/matrix.go +++ b/bridgev2/matrix/matrix.go @@ -16,6 +16,7 @@ import ( "github.com/rs/zerolog" "go.mau.fi/util/jsontime" + "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/status" @@ -68,10 +69,13 @@ func (br *Connector) handleEphemeralEvent(ctx context.Context, evt *event.Event) case event.EphemeralEventTyping: typingContent := evt.Content.AsTyping() typingContent.UserIDs = slices.DeleteFunc(typingContent.UserIDs, br.shouldIgnoreEventFromUser) - case event.EphemeralEventAIStream: + case event.BeeperEphemeralEventAIStream: if br.shouldIgnoreEvent(evt) { return } + if !br.SpecVersions.Supports(mautrix.BeeperFeatureAIStreamEvent) { + return + } } br.Bridge.QueueMatrixEvent(ctx, evt) } diff --git a/bridgev2/matrixinterface.go b/bridgev2/matrixinterface.go index b4b64e1e..768c57d1 100644 --- a/bridgev2/matrixinterface.go +++ b/bridgev2/matrixinterface.go @@ -220,5 +220,5 @@ type MarkAsDMMatrixAPI interface { type EphemeralSendingMatrixAPI interface { MatrixAPI - SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) + BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) } diff --git a/bridgev2/networkinterface.go b/bridgev2/networkinterface.go index cac28019..288bf7c7 100644 --- a/bridgev2/networkinterface.go +++ b/bridgev2/networkinterface.go @@ -657,11 +657,6 @@ type TypingHandlingNetworkAPI interface { HandleMatrixTyping(ctx context.Context, msg *MatrixTyping) error } -type EphemeralHandlingNetworkAPI interface { - NetworkAPI - HandleMatrixEphemeral(ctx context.Context, msg *MatrixEphemeralEvent) error -} - type MarkedUnreadHandlingNetworkAPI interface { NetworkAPI HandleMarkedUnread(ctx context.Context, msg *MatrixMarkedUnread) error @@ -731,12 +726,12 @@ type MessageRequestAcceptingNetworkAPI interface { HandleMatrixAcceptMessageRequest(ctx context.Context, msg *MatrixAcceptMessageRequest) error } -// ActionResponseHandlingNetworkAPI is an optional interface that network connectors +// BeeperActionResponseHandlingNetworkAPI is an optional interface that network connectors // can implement to handle com.beeper.action_response events (MSC1485 action hints). -type ActionResponseHandlingNetworkAPI interface { +type BeeperActionResponseHandlingNetworkAPI interface { NetworkAPI - // HandleMatrixActionResponse is called when a user clicks an action hint button. - HandleMatrixActionResponse(ctx context.Context, msg *MatrixActionResponse) error + // HandleMatrixBeeperActionResponse is called when a user clicks an action hint button. + HandleMatrixBeeperActionResponse(ctx context.Context, msg *MatrixBeeperActionResponse) error } type ResolveIdentifierResponse struct { @@ -1367,10 +1362,6 @@ type MatrixMessage struct { pendingSaves []*outgoingMessage } -type MatrixEphemeralEvent struct { - MatrixEventBase[*event.Content] -} - type MatrixEdit struct { MatrixEventBase[*event.MessageEventContent] EditTarget *database.Message @@ -1456,7 +1447,7 @@ type MatrixViewingChat struct { type MatrixDeleteChat = MatrixEventBase[*event.BeeperChatDeleteEventContent] type MatrixAcceptMessageRequest = MatrixEventBase[*event.BeeperAcceptMessageRequestEventContent] -type MatrixActionResponse = MatrixEventBase[*event.BeeperActionResponseEventContent] +type MatrixBeeperActionResponse = MatrixEventBase[*event.BeeperActionResponseEventContent] type MatrixMarkedUnread = MatrixRoomMeta[*event.MarkedUnreadEventContent] type MatrixMute = MatrixRoomMeta[*event.BeeperMuteEventContent] type MatrixRoomTag = MatrixRoomMeta[*event.TagEventContent] diff --git a/bridgev2/portal.go b/bridgev2/portal.go index ada28549..eada4e89 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -697,8 +697,8 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt * return portal.handleMatrixReceipts(ctx, evt) case event.EphemeralEventTyping: return portal.handleMatrixTyping(ctx, evt) - case event.EphemeralEventAIStream: - return portal.handleMatrixEphemeral(ctx, sender, evt) + case event.BeeperEphemeralEventAIStream: + return portal.handleMatrixAIStream(ctx, sender, evt) default: return EventHandlingResultIgnored } @@ -821,7 +821,7 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt * case event.BeeperAcceptMessageRequest: return portal.handleMatrixAcceptMessageRequest(ctx, login, origSender, evt) case event.BeeperActionResponse: - return portal.handleMatrixActionResponse(ctx, login, origSender, evt) + return portal.handleMatrixBeeperActionResponse(ctx, login, origSender, evt) default: return EventHandlingResultIgnored } @@ -945,15 +945,15 @@ func (portal *Portal) handleMatrixTyping(ctx context.Context, evt *event.Event) return EventHandlingResultSuccess } -func (portal *Portal) handleMatrixEphemeral(ctx context.Context, sender *User, evt *event.Event) EventHandlingResult { +func (portal *Portal) handleMatrixAIStream(ctx context.Context, sender *User, evt *event.Event) EventHandlingResult { log := zerolog.Ctx(ctx) if sender == nil { - log.Error().Msg("Missing sender for ephemeral event") + log.Error().Msg("Missing sender for Matrix AI stream event") return EventHandlingResultIgnored } login, _, err := portal.FindPreferredLogin(ctx, sender, true) if err != nil { - log.Err(err).Msg("Failed to get user login to handle ephemeral event") + log.Err(err).Msg("Failed to get user login to handle Matrix AI stream event") return EventHandlingResultFailed.WithMSSError(err) } var origSender *OrigSender @@ -967,24 +967,7 @@ func (portal *Portal) handleMatrixEphemeral(ctx context.Context, sender *User, e UserID: sender.MXID, } } - ephemeralAPI, ok := login.Client.(EphemeralHandlingNetworkAPI) - if !ok { - return EventHandlingResultIgnored - } - err = ephemeralAPI.HandleMatrixEphemeral(ctx, &MatrixEphemeralEvent{ - MatrixEventBase: MatrixEventBase[*event.Content]{ - Event: evt, - Content: &evt.Content, - Portal: portal, - OrigSender: origSender, - InputTransactionID: portal.parseInputTransactionID(origSender, evt), - }, - }) - if err != nil { - log.Err(err).Msg("Failed to bridge Matrix ephemeral event") - return EventHandlingResultFailed.WithMSSError(err) - } - return EventHandlingResultSuccess.WithMSS() + return portal.handleMatrixBeeperActionResponse(ctx, login, origSender, evt) } func (portal *Portal) sendTypings(ctx context.Context, userIDs []id.UserID, typing bool) { @@ -1864,7 +1847,7 @@ func (portal *Portal) handleMatrixAcceptMessageRequest( return EventHandlingResultSuccess.WithMSS() } -func (portal *Portal) handleMatrixActionResponse( +func (portal *Portal) handleMatrixBeeperActionResponse( ctx context.Context, sender *UserLogin, origSender *OrigSender, @@ -1876,11 +1859,11 @@ func (portal *Portal) handleMatrixActionResponse( log.Error().Type("content_type", evt.Content.Parsed).Msg("Unexpected parsed content type") return EventHandlingResultFailed.WithMSSError(fmt.Errorf("%w: %T", ErrUnexpectedParsedContentType, evt.Content.Parsed)) } - api, ok := sender.Client.(ActionResponseHandlingNetworkAPI) + api, ok := sender.Client.(BeeperActionResponseHandlingNetworkAPI) if !ok { - return EventHandlingResultIgnored.WithMSSError(ErrActionResponseNotSupported) + return EventHandlingResultIgnored.WithMSSError(ErrBeeperActionResponseNotSupported) } - err := api.HandleMatrixActionResponse(ctx, &MatrixActionResponse{ + err := api.HandleMatrixBeeperActionResponse(ctx, &MatrixBeeperActionResponse{ Event: evt, Content: content, Portal: portal, diff --git a/client.go b/client.go index 094ce1cb..d73ae81e 100644 --- a/client.go +++ b/client.go @@ -1359,9 +1359,9 @@ func (cli *Client) SendMessageEvent(ctx context.Context, roomID id.RoomID, event return } -// SendEphemeralEvent sends an ephemeral event into a room. This is a custom unstable endpoint. +// BeeperSendEphemeralEvent sends an ephemeral event into a room using Beeper's unstable endpoint. // contentJSON should be a value that can be encoded as JSON using json.Marshal. -func (cli *Client) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...ReqSendEvent) (resp *RespSendEvent, err error) { +func (cli *Client) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...ReqSendEvent) (resp *RespSendEvent, err error) { var req ReqSendEvent if len(extra) > 0 { req = extra[0] @@ -1378,6 +1378,17 @@ func (cli *Client) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eve if req.Timestamp > 0 { queryParams["ts"] = strconv.FormatInt(req.Timestamp, 10) } + if eventType == event.BeeperEphemeralEventAIStream { + if cli.SpecVersions == nil { + _, err = cli.Versions(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check homeserver feature support via /versions: %w", err) + } + } + if !cli.SpecVersions.Supports(BeeperFeatureAIStreamEvent) { + return nil, MUnrecognized.WithMessage("Homeserver does not advertise com.beeper.ai.stream_event support") + } + } if !req.DontEncrypt && cli != nil && cli.Crypto != nil && eventType != event.EventEncrypted { var isEncrypted bool diff --git a/client_ephemeral_test.go b/client_ephemeral_test.go index 89be27a0..c2846427 100644 --- a/client_ephemeral_test.go +++ b/client_ephemeral_test.go @@ -42,7 +42,7 @@ func TestClient_SendEphemeralEvent_UsesUnstablePathTxnAndTS(t *testing.T) { cli, err := mautrix.NewClient(ts.URL, "", "") require.NoError(t, err) - _, err = cli.SendEphemeralEvent( + _, err = cli.BeeperSendEphemeralEvent( context.Background(), roomID, evtType, @@ -66,7 +66,7 @@ func TestClient_SendEphemeralEvent_UnsupportedReturnsMUnrecognized(t *testing.T) cli, err := mautrix.NewClient(ts.URL, "", "") require.NoError(t, err) - _, err = cli.SendEphemeralEvent( + _, err = cli.BeeperSendEphemeralEvent( context.Background(), id.RoomID("!room:example.com"), event.Type{Type: "com.example.ephemeral", Class: event.EphemeralEventType}, @@ -110,7 +110,7 @@ func TestClient_SendEphemeralEvent_EncryptsInEncryptedRooms(t *testing.T) { cli.StateStore = stateStore cli.Crypto = fakeCrypto - _, err = cli.SendEphemeralEvent( + _, err = cli.BeeperSendEphemeralEvent( context.Background(), roomID, evtType, diff --git a/event/content.go b/event/content.go index 2ce9c911..5edcbc83 100644 --- a/event/content.go +++ b/event/content.go @@ -77,9 +77,10 @@ var TypeMap = map[Type]reflect.Type{ AccountDataMarkedUnread: reflect.TypeOf(MarkedUnreadEventContent{}), AccountDataBeeperMute: reflect.TypeOf(BeeperMuteEventContent{}), - EphemeralEventTyping: reflect.TypeOf(TypingEventContent{}), - EphemeralEventReceipt: reflect.TypeOf(ReceiptEventContent{}), - EphemeralEventPresence: reflect.TypeOf(PresenceEventContent{}), + EphemeralEventTyping: reflect.TypeOf(TypingEventContent{}), + EphemeralEventReceipt: reflect.TypeOf(ReceiptEventContent{}), + EphemeralEventPresence: reflect.TypeOf(PresenceEventContent{}), + BeeperEphemeralEventAIStream: reflect.TypeOf(BeeperActionResponseEventContent{}), InRoomVerificationReady: reflect.TypeOf(VerificationReadyEventContent{}), InRoomVerificationStart: reflect.TypeOf(VerificationStartEventContent{}), diff --git a/event/type.go b/event/type.go index 3a3e49a2..045196b4 100644 --- a/event/type.go +++ b/event/type.go @@ -115,7 +115,7 @@ func (et *Type) GuessClass() TypeClass { StateElementFunctionalMembers.Type, StateBeeperRoomFeatures.Type, StateBeeperDisappearingTimer.Type, StateMSC4391BotCommand.Type, StateRoomPolicy.Type, StateUnstableRoomPolicy.Type: return StateEventType - case EphemeralEventReceipt.Type, EphemeralEventTyping.Type, EphemeralEventPresence.Type, EphemeralEventAIStream.Type: + case EphemeralEventReceipt.Type, EphemeralEventTyping.Type, EphemeralEventPresence.Type, BeeperEphemeralEventAIStream.Type: return EphemeralEventType case AccountDataDirectChats.Type, AccountDataPushRules.Type, AccountDataRoomTags.Type, AccountDataFullyRead.Type, AccountDataIgnoredUserList.Type, AccountDataMarkedUnread.Type, @@ -252,10 +252,10 @@ var ( // Ephemeral events var ( - EphemeralEventReceipt = Type{"m.receipt", EphemeralEventType} - EphemeralEventTyping = Type{"m.typing", EphemeralEventType} - EphemeralEventPresence = Type{"m.presence", EphemeralEventType} - EphemeralEventAIStream = Type{"com.beeper.ai.stream_event", EphemeralEventType} + EphemeralEventReceipt = Type{"m.receipt", EphemeralEventType} + EphemeralEventTyping = Type{"m.typing", EphemeralEventType} + EphemeralEventPresence = Type{"m.presence", EphemeralEventType} + BeeperEphemeralEventAIStream = Type{"com.beeper.ai.stream_event", EphemeralEventType} ) // Account data events diff --git a/versions.go b/versions.go index 8ae82a06..ef29a9bb 100644 --- a/versions.go +++ b/versions.go @@ -80,6 +80,7 @@ var ( BeeperFeatureAccountDataMute = UnstableFeature{UnstableFlag: "com.beeper.account_data_mute"} BeeperFeatureInboxState = UnstableFeature{UnstableFlag: "com.beeper.inbox_state"} BeeperFeatureArbitraryMemberChange = UnstableFeature{UnstableFlag: "com.beeper.arbitrary_member_change"} + BeeperFeatureAIStreamEvent = UnstableFeature{UnstableFlag: "com.beeper.ai.stream_event"} ) func (versions *RespVersions) Supports(feature UnstableFeature) bool {