bridgev2/matrixinterface: add stream upload method

This commit is contained in:
Tulir Asokan 2024-08-19 19:33:26 +03:00
commit 20ce646435
2 changed files with 98 additions and 7 deletions

View file

@ -10,6 +10,8 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
@ -228,19 +230,106 @@ func (as *ASIntent) UploadMedia(ctx context.Context, roomID id.RoomID, data []by
fileName = ""
}
}
req := mautrix.ReqUploadMedia{
url, err = as.doUploadReq(ctx, file, mautrix.ReqUploadMedia{
ContentBytes: data,
ContentType: mimeType,
FileName: fileName,
})
return
}
const inMemoryUploadThreshold = 5 * 1024 * 1024
type writeToCapturer struct {
data []byte
}
func (w *writeToCapturer) Write(p []byte) (n int, err error) {
if w.data == nil {
w.data = p
} else {
w.data = append(w.data, p...)
}
if as.Connector.Config.Homeserver.AsyncMedia {
// Prevent too many background uploads at once
err = as.Connector.uploadSema.Acquire(ctx, int64(len(data)))
if err != nil {
return
return len(p), nil
}
func (as *ASIntent) UploadMediaStream(ctx context.Context, roomID id.RoomID, data io.Reader, size int64, fileName, mimeType string) (url id.ContentURIString, file *event.EncryptedFileInfo, err error) {
if size > as.Connector.MediaConfig.UploadSize {
return "", nil, fmt.Errorf("file too large (%.2f MB > %.2f MB)", float64(size)/1000/1000, float64(as.Connector.MediaConfig.UploadSize)/1000/1000)
} else if 0 < size && size < inMemoryUploadThreshold {
var dataBytes []byte
wt, ok := data.(io.WriterTo)
if ok {
capturer := &writeToCapturer{}
_, err = wt.WriteTo(capturer)
if err != nil {
return "", nil, err
}
dataBytes = capturer.data
} else {
dataBytes, err = io.ReadAll(data)
if err != nil {
return "", nil, err
}
}
req.DoneCallback = func() {
as.Connector.uploadSema.Release(int64(len(data)))
return as.UploadMedia(ctx, roomID, dataBytes, fileName, mimeType)
}
tempFile, err := os.CreateTemp("", "mautrix-upload-*")
if err != nil {
return "", nil, fmt.Errorf("failed to create temp file: %w", err)
}
defer func() {
_ = tempFile.Close()
_ = os.Remove(tempFile.Name())
}()
var realSize int64
if roomID != "" {
var encrypted bool
if encrypted, err = as.Matrix.StateStore.IsEncrypted(ctx, roomID); err != nil {
err = fmt.Errorf("failed to check if room is encrypted: %w", err)
return
} else if encrypted {
file = &event.EncryptedFileInfo{
EncryptedFile: *attachment.NewEncryptedFile(),
}
encryptStream := file.EncryptStream(data)
realSize, err = io.Copy(tempFile, encryptStream)
if err != nil {
return "", nil, fmt.Errorf("failed to write to temp file: %w", err)
}
err = encryptStream.Close()
if err != nil {
return "", nil, fmt.Errorf("failed to finalize encryption: %w", err)
}
mimeType = "application/octet-stream"
fileName = ""
}
} else {
realSize, err = io.Copy(tempFile, data)
if err != nil {
return "", nil, fmt.Errorf("failed to write to temp file: %w", err)
}
}
url, err = as.doUploadReq(ctx, file, mautrix.ReqUploadMedia{
Content: tempFile,
ContentLength: realSize,
ContentType: mimeType,
FileName: fileName,
})
return
}
func (as *ASIntent) doUploadReq(ctx context.Context, file *event.EncryptedFileInfo, req mautrix.ReqUploadMedia) (url id.ContentURIString, err error) {
if as.Connector.Config.Homeserver.AsyncMedia {
if req.ContentBytes != nil {
// Prevent too many background uploads at once
err = as.Connector.uploadSema.Acquire(ctx, int64(len(req.ContentBytes)))
if err != nil {
return
}
req.DoneCallback = func() {
as.Connector.uploadSema.Release(int64(len(req.ContentBytes)))
}
}
var resp *mautrix.RespCreateMXC
resp, err = as.Matrix.UploadAsync(ctx, req)

View file

@ -8,6 +8,7 @@ package bridgev2
import (
"context"
"io"
"time"
"github.com/gorilla/mux"
@ -88,6 +89,7 @@ type MatrixAPI interface {
MarkTyping(ctx context.Context, roomID id.RoomID, typingType TypingType, timeout time.Duration) error
DownloadMedia(ctx context.Context, uri id.ContentURIString, file *event.EncryptedFileInfo) ([]byte, error)
UploadMedia(ctx context.Context, roomID id.RoomID, data []byte, fileName, mimeType string) (url id.ContentURIString, file *event.EncryptedFileInfo, err error)
UploadMediaStream(ctx context.Context, roomID id.RoomID, data io.Reader, size int64, fileName, mimeType string) (url id.ContentURIString, file *event.EncryptedFileInfo, err error)
SetDisplayName(ctx context.Context, name string) error
SetAvatarURL(ctx context.Context, avatarURL id.ContentURIString) error