Implement message handler for dialout support.

This commit is contained in:
Joachim Bauch 2023-09-20 12:38:36 +02:00
parent 04192e96f1
commit d1544dcb2c
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 339 additions and 3 deletions

View file

@ -31,6 +31,7 @@ import (
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
)
@ -104,6 +105,8 @@ type BackendServerRoomRequest struct {
SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"`
Dialout *BackendRoomDialoutRequest `json:"dialout,omitempty"`
// Internal properties
ReceivedTime int64 `json:"received,omitempty"`
}
@ -170,6 +173,50 @@ type BackendRoomSwitchToMessageRequest struct {
SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"`
}
type BackendRoomDialoutRequest struct {
// E.164 number to dial (e.g. "+1234567890")
Number string `json:"number"`
Options json.RawMessage `json:"options,omitempty"`
}
var (
checkE164Number = regexp.MustCompile(`^\+\d{2,}$`)
)
func isValidNumber(s string) bool {
return checkE164Number.MatchString(s)
}
func (r *BackendRoomDialoutRequest) ValidateNumber() *Error {
if r.Number == "" {
return NewError("number_missing", "No number provided")
}
if !isValidNumber(r.Number) {
return NewError("invalid_number", "Expected E.164 number.")
}
return nil
}
type BackendServerRoomResponse struct {
Type string `json:"type"`
Dialout *BackendRoomDialoutResponse `json:"dialout,omitempty"`
}
type BackendRoomDialoutError struct {
Code string `json:"code"`
Message string `json:"message,omitempty"`
}
type BackendRoomDialoutResponse struct {
CallId string `json:"callid,omitempty"`
Error *Error `json:"error,omitempty"`
}
// Requests from the signaling server to the Nextcloud backend.
type BackendClientAuthRequest struct {

View file

@ -56,3 +56,27 @@ func TestBackendChecksum(t *testing.T) {
t.Errorf("Checksum %s could not be validated from request", check1)
}
}
func TestValidNumbers(t *testing.T) {
valid := []string{
"+12",
"+12345",
}
invalid := []string{
"+1",
"12345",
" +12345",
" +12345 ",
"+123-45",
}
for _, number := range valid {
if !isValidNumber(number) {
t.Errorf("number %s should be valid", number)
}
}
for _, number := range invalid {
if isValidNumber(number) {
t.Errorf("number %s should not be valid", number)
}
}
}

View file

@ -23,6 +23,7 @@ package signaling
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
@ -164,6 +165,10 @@ type ServerMessage struct {
Event *EventServerMessage `json:"event,omitempty"`
TransientData *TransientDataServerMessage `json:"transient,omitempty"`
Internal *InternalServerMessage `json:"internal,omitempty"`
Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"`
}
func (r *ServerMessage) CloseAfterSend(session Session) bool {
@ -444,12 +449,14 @@ const (
ServerFeatureWelcome = "welcome"
ServerFeatureHelloV2 = "hello-v2"
ServerFeatureSwitchTo = "switchto"
ServerFeatureDialout = "dialout"
// Features to send to internal clients only.
ServerFeatureInternalVirtualSessions = "virtual-sessions"
// Possible client features from the "hello" request.
ClientFeatureInternalInCall = "internal-incall"
ClientFeatureStartDialout = "start-dialout"
)
var (
@ -460,6 +467,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
DefaultFeaturesInternal = []string{
ServerFeatureInternalVirtualSessions,
@ -468,6 +476,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
DefaultWelcomeFeatures = []string{
ServerFeatureAudioVideoPermissions,
@ -477,6 +486,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
)
@ -684,6 +694,52 @@ func (m *InCallInternalClientMessage) CheckValid() error {
return nil
}
type DialoutStatus string
var (
DialoutStatusAccepted DialoutStatus = "accepted"
DialoutStatusRinging DialoutStatus = "ringing"
DialoutStatusConnected DialoutStatus = "connected"
DialoutStatusRejected DialoutStatus = "rejected"
DialoutStatusCleared DialoutStatus = "cleared"
)
type DialoutStatusInternalClientMessage struct {
CallId string `json:"callid"`
Status DialoutStatus `json:"status"`
// Cause is set if Status is "cleared" or "rejected".
Cause string `json:"cause,omitempty"`
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}
type DialoutInternalClientMessage struct {
Type string `json:"type"`
RoomId string `json:"roomid,omitempty"`
Error *Error `json:"error,omitempty"`
Status *DialoutStatusInternalClientMessage `json:"status,omitempty"`
}
func (m *DialoutInternalClientMessage) CheckValid() error {
switch m.Type {
case "":
return errors.New("type missing")
case "error":
if m.Error == nil {
return errors.New("error missing")
}
case "status":
if m.Status == nil {
return errors.New("status missing")
}
}
return nil
}
type InternalClientMessage struct {
Type string `json:"type"`
@ -694,10 +750,14 @@ type InternalClientMessage struct {
RemoveSession *RemoveSessionInternalClientMessage `json:"removesession,omitempty"`
InCall *InCallInternalClientMessage `json:"incall,omitempty"`
Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"`
}
func (m *InternalClientMessage) CheckValid() error {
switch m.Type {
case "":
return errors.New("type missing")
case "addsession":
if m.AddSession == nil {
return fmt.Errorf("addsession missing")
@ -722,10 +782,28 @@ func (m *InternalClientMessage) CheckValid() error {
} else if err := m.InCall.CheckValid(); err != nil {
return err
}
case "dialout":
if m.Dialout == nil {
return fmt.Errorf("dialout missing")
} else if err := m.Dialout.CheckValid(); err != nil {
return err
}
}
return nil
}
type InternalServerDialoutRequest struct {
RoomId string `json:"roomid"`
Request *BackendRoomDialoutRequest `json:"request"`
}
type InternalServerMessage struct {
Type string `json:"type"`
Dialout *InternalServerDialoutRequest `json:"dialout,omitempty"`
}
// Type "event"
type RoomEventServerMessage struct {

View file

@ -28,6 +28,7 @@ import (
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@ -37,6 +38,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
@ -639,6 +641,115 @@ func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, reques
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
type BackendResponseWithStatus interface {
Status() int
}
type DialoutErrorResponse struct {
BackendServerRoomResponse
status int
}
func (r *DialoutErrorResponse) Status() int {
return r.status
}
func returnDialoutError(status int, err *Error) (any, error) {
response := &DialoutErrorResponse{
BackendServerRoomResponse: BackendServerRoomResponse{
Type: "dialout",
Dialout: &BackendRoomDialoutResponse{
Error: err,
},
},
status: status,
}
return response, nil
}
func (b *BackendServer) startDialout(roomid string, backend *Backend, request *BackendServerRoomRequest) (any, error) {
if err := request.Dialout.ValidateNumber(); err != nil {
return returnDialoutError(http.StatusBadRequest, err)
}
// TODO: Add direct lookup of internal sessions that are not in a room and support dialout.
var session *ClientSession
for _, s := range b.hub.sessions {
if s.GetRoom() == nil && s.ClientType() == HelloClientTypeInternal {
clientSession, ok := s.(*ClientSession)
if ok && clientSession.GetClient() != nil && clientSession.HasFeature(ClientFeatureStartDialout) {
session = clientSession
break
}
}
}
if session == nil {
return returnDialoutError(http.StatusNotFound, NewError("no_client_available", "No available client found to trigger dialout."))
}
id := newRandomString(32)
msg := &ServerMessage{
Id: id,
Type: "internal",
Internal: &InternalServerMessage{
Type: "dialout",
Dialout: &InternalServerDialoutRequest{
RoomId: roomid,
Request: request.Dialout,
},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var response atomic.Pointer[DialoutInternalClientMessage]
session.HandleResponse(id, func(message *ClientMessage) bool {
response.Store(message.Internal.Dialout)
cancel()
// Don't send error to other sessions in the room.
return message.Internal.Dialout.Error != nil
})
defer session.ClearResponseHandler(id)
if !session.SendMessage(msg) {
return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "Could not notify about new dialout."))
}
<-ctx.Done()
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
return returnDialoutError(http.StatusGatewayTimeout, NewError("timeout", "Timeout while waiting for dialout to start."))
}
dialout := response.Load()
if dialout == nil {
return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "No dialout response received."))
}
switch dialout.Type {
case "error":
return returnDialoutError(http.StatusBadGateway, dialout.Error)
case "status":
if dialout.Status.Status != DialoutStatusAccepted {
log.Printf("Received unsupported dialout status when triggering dialout: %+v", dialout)
return returnDialoutError(http.StatusBadGateway, NewError("unsupported_status", "Unsupported dialout status received."))
}
return &BackendServerRoomResponse{
Type: "dialout",
Dialout: &BackendRoomDialoutResponse{
CallId: dialout.Status.CallId,
},
}, nil
}
log.Printf("Received unsupported dialout type when triggering dialout: %+v", dialout)
return returnDialoutError(http.StatusBadGateway, NewError("unsupported_type", "Unsupported dialout type received."))
}
func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) {
v := mux.Vars(r)
roomid := v["roomid"]
@ -692,6 +803,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
request.ReceivedTime = time.Now().UnixNano()
var response any
var err error
switch request.Type {
case "invite":
@ -722,6 +834,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
err = b.sendRoomMessage(roomid, backend, &request)
case "switchto":
err = b.sendRoomSwitchTo(roomid, backend, &request)
case "dialout":
response, err = b.startDialout(roomid, backend, &request)
default:
http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest)
return
@ -733,11 +847,27 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
return
}
var responseData []byte
responseStatus := http.StatusOK
if response == nil {
// TODO(jojo): Return better response struct.
responseData = []byte("{}")
} else {
if s, ok := response.(BackendResponseWithStatus); ok {
responseStatus = s.Status()
}
responseData, err = json.Marshal(response)
if err != nil {
log.Printf("Could not serialize backend response %+v: %s", response, err)
responseStatus = http.StatusInternalServerError
responseData = []byte("{\"error\":\"could_not_serialize\"}")
}
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
// TODO(jojo): Return better response struct.
w.Write([]byte("{}")) // nolint
w.WriteHeader(responseStatus)
w.Write(responseData) // nolint
}
func (b *BackendServer) allowStatsAccess(r *http.Request) bool {

View file

@ -46,6 +46,9 @@ var (
PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend"
)
// ResponseHandlerFunc will return "true" has been fully processed.
type ResponseHandlerFunc func(message *ClientMessage) bool
type ClientSession struct {
roomJoinTime int64
inCall uint32
@ -89,6 +92,9 @@ type ClientSession struct {
seenJoinedLock sync.Mutex
seenJoinedEvents map[string]bool
responseHandlersLock sync.Mutex
responseHandlers map[string]ResponseHandlerFunc
}
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
@ -1420,3 +1426,38 @@ func (s *ClientSession) GetVirtualSessions() []*VirtualSession {
}
return result
}
func (s *ClientSession) HandleResponse(id string, handler ResponseHandlerFunc) {
s.responseHandlersLock.Lock()
defer s.responseHandlersLock.Unlock()
if s.responseHandlers == nil {
s.responseHandlers = make(map[string]ResponseHandlerFunc)
}
s.responseHandlers[id] = handler
}
func (s *ClientSession) ClearResponseHandler(id string) {
s.responseHandlersLock.Lock()
defer s.responseHandlersLock.Unlock()
delete(s.responseHandlers, id)
}
func (s *ClientSession) ProcessResponse(message *ClientMessage) bool {
id := message.Id
if id == "" {
return false
}
s.responseHandlersLock.Lock()
cb, found := s.responseHandlers[id]
defer s.responseHandlersLock.Unlock()
if !found {
return false
}
return cb(message)
}

16
hub.go
View file

@ -1772,6 +1772,10 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
return
}
if session.ProcessResponse(message) {
return
}
switch msg.Type {
case "addsession":
msg := msg.AddSession
@ -1923,6 +1927,18 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
room.NotifySessionChanged(session, SessionChangeInCall)
}
}
case "dialout":
roomId := msg.Dialout.RoomId
msg.Dialout.RoomId = "" // Don't send room id to recipients.
if err := h.events.PublishRoomMessage(roomId, session.Backend(), &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Type: "dialout",
Dialout: msg.Dialout,
},
}); err != nil {
log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId)
}
default:
log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId())
return