Add Beeper AI stream ephemeral support

Rename generic ephemeral APIs to Beeper-specific variants and add support for Beeper's AI stream ephemeral event. Changes include: renaming SendEphemeralEvent -> BeeperSendEphemeralEvent in client, appservice intent and AS intent; introducing BeeperEphemeralEventAIStream event type and BeeperFeatureAIStreamEvent feature flag; gating AI stream handling on homeserver feature support; updating connector/portal handling and network/matrix interfaces to use Beeper-prefixed action-response and AI stream types; and updating related tests. These changes separate Beeper unstable ephemeral semantics from generic ephemeral handling and ensure feature negotiation before sending AI stream events.
This commit is contained in:
batuhan içöz 2026-03-02 15:35:16 +01:00
commit dbd5a393e6
No known key found for this signature in database
13 changed files with 54 additions and 63 deletions

View file

@ -222,12 +222,12 @@ func (intent *IntentAPI) SendMessageEvent(ctx context.Context, roomID id.RoomID,
return intent.Client.SendMessageEvent(ctx, roomID, eventType, contentJSON, extra...)
}
func (intent *IntentAPI) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...mautrix.ReqSendEvent) (*mautrix.RespSendEvent, error) {
func (intent *IntentAPI) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...mautrix.ReqSendEvent) (*mautrix.RespSendEvent, error) {
if err := intent.EnsureJoined(ctx, roomID); err != nil {
return nil, err
}
contentJSON = intent.AddDoublePuppetValue(contentJSON)
return intent.Client.SendEphemeralEvent(ctx, roomID, eventType, contentJSON, extra...)
return intent.Client.BeeperSendEphemeralEvent(ctx, roomID, eventType, contentJSON, extra...)
}
// Deprecated: use SendMessageEvent with mautrix.ReqSendEvent.Timestamp instead

View file

@ -75,7 +75,7 @@ var (
ErrMediaConvertFailed error = WrapErrorInStatus(errors.New("failed to convert media")).WithMessage("failed to convert media").WithIsCertain(true).WithSendNotice(true)
ErrMembershipNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support changing group membership")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported)
ErrDeleteChatNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support deleting chats")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported)
ErrActionResponseNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support action responses")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported)
ErrBeeperActionResponseNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support Beeper action responses")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported)
ErrPowerLevelsNotSupported error = WrapErrorInStatus(errors.New("this bridge does not support changing group power levels")).WithIsCertain(true).WithErrorAsMessage().WithSendNotice(false).WithErrorReason(event.MessageStatusUnsupported)
ErrRemoteEchoTimeout = WrapErrorInStatus(errors.New("remote echo timed out")).WithIsCertain(false).WithSendNotice(true).WithErrorReason(event.MessageStatusTooOld)
ErrRemoteAckTimeout = WrapErrorInStatus(errors.New("remote ack timed out")).WithIsCertain(false).WithSendNotice(true).WithErrorReason(event.MessageStatusTooOld)

View file

@ -158,7 +158,7 @@ func (br *Connector) Init(bridge *bridgev2.Bridge) {
br.EventProcessor.On(event.BeeperActionResponse, br.handleRoomEvent)
br.EventProcessor.On(event.EphemeralEventReceipt, br.handleEphemeralEvent)
br.EventProcessor.On(event.EphemeralEventTyping, br.handleEphemeralEvent)
br.EventProcessor.On(event.EphemeralEventAIStream, br.handleEphemeralEvent)
br.EventProcessor.On(event.BeeperEphemeralEventAIStream, br.handleEphemeralEvent)
br.Bot = br.AS.BotIntent()
br.Crypto = NewCryptoHelper(br)
br.Bridge.Commands.(*commands.Processor).AddHandlers(

View file

@ -85,7 +85,7 @@ func (as *ASIntent) SendMessage(ctx context.Context, roomID id.RoomID, eventType
return as.Matrix.SendMessageEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{Timestamp: extra.Timestamp.UnixMilli()})
}
func (as *ASIntent) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) {
func (as *ASIntent) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error) {
if encrypted, err := as.Matrix.StateStore.IsEncrypted(ctx, roomID); err != nil {
return nil, fmt.Errorf("failed to check if room is encrypted: %w", err)
} else if encrypted && as.Connector.Crypto != nil {
@ -94,7 +94,7 @@ func (as *ASIntent) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, ev
}
eventType = event.EventEncrypted
}
return as.Matrix.SendEphemeralEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{TransactionID: txnID})
return as.Matrix.BeeperSendEphemeralEvent(ctx, roomID, eventType, content, mautrix.ReqSendEvent{TransactionID: txnID})
}
func (as *ASIntent) fillMemberEvent(ctx context.Context, roomID id.RoomID, userID id.UserID, content *event.Content) {

View file

@ -16,6 +16,7 @@ import (
"github.com/rs/zerolog"
"go.mau.fi/util/jsontime"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/status"
@ -68,10 +69,13 @@ func (br *Connector) handleEphemeralEvent(ctx context.Context, evt *event.Event)
case event.EphemeralEventTyping:
typingContent := evt.Content.AsTyping()
typingContent.UserIDs = slices.DeleteFunc(typingContent.UserIDs, br.shouldIgnoreEventFromUser)
case event.EphemeralEventAIStream:
case event.BeeperEphemeralEventAIStream:
if br.shouldIgnoreEvent(evt) {
return
}
if !br.SpecVersions.Supports(mautrix.BeeperFeatureAIStreamEvent) {
return
}
}
br.Bridge.QueueMatrixEvent(ctx, evt)
}

View file

@ -220,5 +220,5 @@ type MarkAsDMMatrixAPI interface {
type EphemeralSendingMatrixAPI interface {
MatrixAPI
SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error)
BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, content *event.Content, txnID string) (*mautrix.RespSendEvent, error)
}

View file

@ -657,11 +657,6 @@ type TypingHandlingNetworkAPI interface {
HandleMatrixTyping(ctx context.Context, msg *MatrixTyping) error
}
type EphemeralHandlingNetworkAPI interface {
NetworkAPI
HandleMatrixEphemeral(ctx context.Context, msg *MatrixEphemeralEvent) error
}
type MarkedUnreadHandlingNetworkAPI interface {
NetworkAPI
HandleMarkedUnread(ctx context.Context, msg *MatrixMarkedUnread) error
@ -731,12 +726,12 @@ type MessageRequestAcceptingNetworkAPI interface {
HandleMatrixAcceptMessageRequest(ctx context.Context, msg *MatrixAcceptMessageRequest) error
}
// ActionResponseHandlingNetworkAPI is an optional interface that network connectors
// BeeperActionResponseHandlingNetworkAPI is an optional interface that network connectors
// can implement to handle com.beeper.action_response events (MSC1485 action hints).
type ActionResponseHandlingNetworkAPI interface {
type BeeperActionResponseHandlingNetworkAPI interface {
NetworkAPI
// HandleMatrixActionResponse is called when a user clicks an action hint button.
HandleMatrixActionResponse(ctx context.Context, msg *MatrixActionResponse) error
// HandleMatrixBeeperActionResponse is called when a user clicks an action hint button.
HandleMatrixBeeperActionResponse(ctx context.Context, msg *MatrixBeeperActionResponse) error
}
type ResolveIdentifierResponse struct {
@ -1367,10 +1362,6 @@ type MatrixMessage struct {
pendingSaves []*outgoingMessage
}
type MatrixEphemeralEvent struct {
MatrixEventBase[*event.Content]
}
type MatrixEdit struct {
MatrixEventBase[*event.MessageEventContent]
EditTarget *database.Message
@ -1456,7 +1447,7 @@ type MatrixViewingChat struct {
type MatrixDeleteChat = MatrixEventBase[*event.BeeperChatDeleteEventContent]
type MatrixAcceptMessageRequest = MatrixEventBase[*event.BeeperAcceptMessageRequestEventContent]
type MatrixActionResponse = MatrixEventBase[*event.BeeperActionResponseEventContent]
type MatrixBeeperActionResponse = MatrixEventBase[*event.BeeperActionResponseEventContent]
type MatrixMarkedUnread = MatrixRoomMeta[*event.MarkedUnreadEventContent]
type MatrixMute = MatrixRoomMeta[*event.BeeperMuteEventContent]
type MatrixRoomTag = MatrixRoomMeta[*event.TagEventContent]

View file

@ -697,8 +697,8 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt *
return portal.handleMatrixReceipts(ctx, evt)
case event.EphemeralEventTyping:
return portal.handleMatrixTyping(ctx, evt)
case event.EphemeralEventAIStream:
return portal.handleMatrixEphemeral(ctx, sender, evt)
case event.BeeperEphemeralEventAIStream:
return portal.handleMatrixAIStream(ctx, sender, evt)
default:
return EventHandlingResultIgnored
}
@ -821,7 +821,7 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt *
case event.BeeperAcceptMessageRequest:
return portal.handleMatrixAcceptMessageRequest(ctx, login, origSender, evt)
case event.BeeperActionResponse:
return portal.handleMatrixActionResponse(ctx, login, origSender, evt)
return portal.handleMatrixBeeperActionResponse(ctx, login, origSender, evt)
default:
return EventHandlingResultIgnored
}
@ -945,15 +945,15 @@ func (portal *Portal) handleMatrixTyping(ctx context.Context, evt *event.Event)
return EventHandlingResultSuccess
}
func (portal *Portal) handleMatrixEphemeral(ctx context.Context, sender *User, evt *event.Event) EventHandlingResult {
func (portal *Portal) handleMatrixAIStream(ctx context.Context, sender *User, evt *event.Event) EventHandlingResult {
log := zerolog.Ctx(ctx)
if sender == nil {
log.Error().Msg("Missing sender for ephemeral event")
log.Error().Msg("Missing sender for Matrix AI stream event")
return EventHandlingResultIgnored
}
login, _, err := portal.FindPreferredLogin(ctx, sender, true)
if err != nil {
log.Err(err).Msg("Failed to get user login to handle ephemeral event")
log.Err(err).Msg("Failed to get user login to handle Matrix AI stream event")
return EventHandlingResultFailed.WithMSSError(err)
}
var origSender *OrigSender
@ -967,24 +967,7 @@ func (portal *Portal) handleMatrixEphemeral(ctx context.Context, sender *User, e
UserID: sender.MXID,
}
}
ephemeralAPI, ok := login.Client.(EphemeralHandlingNetworkAPI)
if !ok {
return EventHandlingResultIgnored
}
err = ephemeralAPI.HandleMatrixEphemeral(ctx, &MatrixEphemeralEvent{
MatrixEventBase: MatrixEventBase[*event.Content]{
Event: evt,
Content: &evt.Content,
Portal: portal,
OrigSender: origSender,
InputTransactionID: portal.parseInputTransactionID(origSender, evt),
},
})
if err != nil {
log.Err(err).Msg("Failed to bridge Matrix ephemeral event")
return EventHandlingResultFailed.WithMSSError(err)
}
return EventHandlingResultSuccess.WithMSS()
return portal.handleMatrixBeeperActionResponse(ctx, login, origSender, evt)
}
func (portal *Portal) sendTypings(ctx context.Context, userIDs []id.UserID, typing bool) {
@ -1864,7 +1847,7 @@ func (portal *Portal) handleMatrixAcceptMessageRequest(
return EventHandlingResultSuccess.WithMSS()
}
func (portal *Portal) handleMatrixActionResponse(
func (portal *Portal) handleMatrixBeeperActionResponse(
ctx context.Context,
sender *UserLogin,
origSender *OrigSender,
@ -1876,11 +1859,11 @@ func (portal *Portal) handleMatrixActionResponse(
log.Error().Type("content_type", evt.Content.Parsed).Msg("Unexpected parsed content type")
return EventHandlingResultFailed.WithMSSError(fmt.Errorf("%w: %T", ErrUnexpectedParsedContentType, evt.Content.Parsed))
}
api, ok := sender.Client.(ActionResponseHandlingNetworkAPI)
api, ok := sender.Client.(BeeperActionResponseHandlingNetworkAPI)
if !ok {
return EventHandlingResultIgnored.WithMSSError(ErrActionResponseNotSupported)
return EventHandlingResultIgnored.WithMSSError(ErrBeeperActionResponseNotSupported)
}
err := api.HandleMatrixActionResponse(ctx, &MatrixActionResponse{
err := api.HandleMatrixBeeperActionResponse(ctx, &MatrixBeeperActionResponse{
Event: evt,
Content: content,
Portal: portal,

View file

@ -1359,9 +1359,9 @@ func (cli *Client) SendMessageEvent(ctx context.Context, roomID id.RoomID, event
return
}
// SendEphemeralEvent sends an ephemeral event into a room. This is a custom unstable endpoint.
// BeeperSendEphemeralEvent sends an ephemeral event into a room using Beeper's unstable endpoint.
// contentJSON should be a value that can be encoded as JSON using json.Marshal.
func (cli *Client) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...ReqSendEvent) (resp *RespSendEvent, err error) {
func (cli *Client) BeeperSendEphemeralEvent(ctx context.Context, roomID id.RoomID, eventType event.Type, contentJSON any, extra ...ReqSendEvent) (resp *RespSendEvent, err error) {
var req ReqSendEvent
if len(extra) > 0 {
req = extra[0]
@ -1378,6 +1378,17 @@ func (cli *Client) SendEphemeralEvent(ctx context.Context, roomID id.RoomID, eve
if req.Timestamp > 0 {
queryParams["ts"] = strconv.FormatInt(req.Timestamp, 10)
}
if eventType == event.BeeperEphemeralEventAIStream {
if cli.SpecVersions == nil {
_, err = cli.Versions(ctx)
if err != nil {
return nil, fmt.Errorf("failed to check homeserver feature support via /versions: %w", err)
}
}
if !cli.SpecVersions.Supports(BeeperFeatureAIStreamEvent) {
return nil, MUnrecognized.WithMessage("Homeserver does not advertise com.beeper.ai.stream_event support")
}
}
if !req.DontEncrypt && cli != nil && cli.Crypto != nil && eventType != event.EventEncrypted {
var isEncrypted bool

View file

@ -42,7 +42,7 @@ func TestClient_SendEphemeralEvent_UsesUnstablePathTxnAndTS(t *testing.T) {
cli, err := mautrix.NewClient(ts.URL, "", "")
require.NoError(t, err)
_, err = cli.SendEphemeralEvent(
_, err = cli.BeeperSendEphemeralEvent(
context.Background(),
roomID,
evtType,
@ -66,7 +66,7 @@ func TestClient_SendEphemeralEvent_UnsupportedReturnsMUnrecognized(t *testing.T)
cli, err := mautrix.NewClient(ts.URL, "", "")
require.NoError(t, err)
_, err = cli.SendEphemeralEvent(
_, err = cli.BeeperSendEphemeralEvent(
context.Background(),
id.RoomID("!room:example.com"),
event.Type{Type: "com.example.ephemeral", Class: event.EphemeralEventType},
@ -110,7 +110,7 @@ func TestClient_SendEphemeralEvent_EncryptsInEncryptedRooms(t *testing.T) {
cli.StateStore = stateStore
cli.Crypto = fakeCrypto
_, err = cli.SendEphemeralEvent(
_, err = cli.BeeperSendEphemeralEvent(
context.Background(),
roomID,
evtType,

View file

@ -77,9 +77,10 @@ var TypeMap = map[Type]reflect.Type{
AccountDataMarkedUnread: reflect.TypeOf(MarkedUnreadEventContent{}),
AccountDataBeeperMute: reflect.TypeOf(BeeperMuteEventContent{}),
EphemeralEventTyping: reflect.TypeOf(TypingEventContent{}),
EphemeralEventReceipt: reflect.TypeOf(ReceiptEventContent{}),
EphemeralEventPresence: reflect.TypeOf(PresenceEventContent{}),
EphemeralEventTyping: reflect.TypeOf(TypingEventContent{}),
EphemeralEventReceipt: reflect.TypeOf(ReceiptEventContent{}),
EphemeralEventPresence: reflect.TypeOf(PresenceEventContent{}),
BeeperEphemeralEventAIStream: reflect.TypeOf(BeeperActionResponseEventContent{}),
InRoomVerificationReady: reflect.TypeOf(VerificationReadyEventContent{}),
InRoomVerificationStart: reflect.TypeOf(VerificationStartEventContent{}),

View file

@ -115,7 +115,7 @@ func (et *Type) GuessClass() TypeClass {
StateElementFunctionalMembers.Type, StateBeeperRoomFeatures.Type, StateBeeperDisappearingTimer.Type,
StateMSC4391BotCommand.Type, StateRoomPolicy.Type, StateUnstableRoomPolicy.Type:
return StateEventType
case EphemeralEventReceipt.Type, EphemeralEventTyping.Type, EphemeralEventPresence.Type, EphemeralEventAIStream.Type:
case EphemeralEventReceipt.Type, EphemeralEventTyping.Type, EphemeralEventPresence.Type, BeeperEphemeralEventAIStream.Type:
return EphemeralEventType
case AccountDataDirectChats.Type, AccountDataPushRules.Type, AccountDataRoomTags.Type,
AccountDataFullyRead.Type, AccountDataIgnoredUserList.Type, AccountDataMarkedUnread.Type,
@ -252,10 +252,10 @@ var (
// Ephemeral events
var (
EphemeralEventReceipt = Type{"m.receipt", EphemeralEventType}
EphemeralEventTyping = Type{"m.typing", EphemeralEventType}
EphemeralEventPresence = Type{"m.presence", EphemeralEventType}
EphemeralEventAIStream = Type{"com.beeper.ai.stream_event", EphemeralEventType}
EphemeralEventReceipt = Type{"m.receipt", EphemeralEventType}
EphemeralEventTyping = Type{"m.typing", EphemeralEventType}
EphemeralEventPresence = Type{"m.presence", EphemeralEventType}
BeeperEphemeralEventAIStream = Type{"com.beeper.ai.stream_event", EphemeralEventType}
)
// Account data events

View file

@ -80,6 +80,7 @@ var (
BeeperFeatureAccountDataMute = UnstableFeature{UnstableFlag: "com.beeper.account_data_mute"}
BeeperFeatureInboxState = UnstableFeature{UnstableFlag: "com.beeper.inbox_state"}
BeeperFeatureArbitraryMemberChange = UnstableFeature{UnstableFlag: "com.beeper.arbitrary_member_change"}
BeeperFeatureAIStreamEvent = UnstableFeature{UnstableFlag: "com.beeper.ai.stream_event"}
)
func (versions *RespVersions) Supports(feature UnstableFeature) bool {