From d1544dcb2c65e4ce130938ebc66f398f78d5ecb1 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 20 Sep 2023 12:38:36 +0200 Subject: [PATCH] Implement message handler for dialout support. --- api_backend.go | 47 +++++++++++++++ api_backend_test.go | 24 ++++++++ api_signaling.go | 78 +++++++++++++++++++++++++ backend_server.go | 136 +++++++++++++++++++++++++++++++++++++++++++- clientsession.go | 41 +++++++++++++ hub.go | 16 ++++++ 6 files changed, 339 insertions(+), 3 deletions(-) diff --git a/api_backend.go b/api_backend.go index 67f7a15..44e7672 100644 --- a/api_backend.go +++ b/api_backend.go @@ -31,6 +31,7 @@ import ( "fmt" "net/http" "net/url" + "regexp" "strings" ) @@ -104,6 +105,8 @@ type BackendServerRoomRequest struct { SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"` + Dialout *BackendRoomDialoutRequest `json:"dialout,omitempty"` + // Internal properties ReceivedTime int64 `json:"received,omitempty"` } @@ -170,6 +173,50 @@ type BackendRoomSwitchToMessageRequest struct { SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"` } +type BackendRoomDialoutRequest struct { + // E.164 number to dial (e.g. "+1234567890") + Number string `json:"number"` + + Options json.RawMessage `json:"options,omitempty"` +} + +var ( + checkE164Number = regexp.MustCompile(`^\+\d{2,}$`) +) + +func isValidNumber(s string) bool { + return checkE164Number.MatchString(s) +} + +func (r *BackendRoomDialoutRequest) ValidateNumber() *Error { + if r.Number == "" { + return NewError("number_missing", "No number provided") + } + + if !isValidNumber(r.Number) { + return NewError("invalid_number", "Expected E.164 number.") + } + + return nil +} + +type BackendServerRoomResponse struct { + Type string `json:"type"` + + Dialout *BackendRoomDialoutResponse `json:"dialout,omitempty"` +} + +type BackendRoomDialoutError struct { + Code string `json:"code"` + Message string `json:"message,omitempty"` +} + +type BackendRoomDialoutResponse struct { + CallId string `json:"callid,omitempty"` + + Error *Error `json:"error,omitempty"` +} + // Requests from the signaling server to the Nextcloud backend. type BackendClientAuthRequest struct { diff --git a/api_backend_test.go b/api_backend_test.go index cc50e8f..48f5e41 100644 --- a/api_backend_test.go +++ b/api_backend_test.go @@ -56,3 +56,27 @@ func TestBackendChecksum(t *testing.T) { t.Errorf("Checksum %s could not be validated from request", check1) } } + +func TestValidNumbers(t *testing.T) { + valid := []string{ + "+12", + "+12345", + } + invalid := []string{ + "+1", + "12345", + " +12345", + " +12345 ", + "+123-45", + } + for _, number := range valid { + if !isValidNumber(number) { + t.Errorf("number %s should be valid", number) + } + } + for _, number := range invalid { + if isValidNumber(number) { + t.Errorf("number %s should not be valid", number) + } + } +} diff --git a/api_signaling.go b/api_signaling.go index db4036b..edd1fd2 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -23,6 +23,7 @@ package signaling import ( "encoding/json" + "errors" "fmt" "log" "net/url" @@ -164,6 +165,10 @@ type ServerMessage struct { Event *EventServerMessage `json:"event,omitempty"` TransientData *TransientDataServerMessage `json:"transient,omitempty"` + + Internal *InternalServerMessage `json:"internal,omitempty"` + + Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"` } func (r *ServerMessage) CloseAfterSend(session Session) bool { @@ -444,12 +449,14 @@ const ( ServerFeatureWelcome = "welcome" ServerFeatureHelloV2 = "hello-v2" ServerFeatureSwitchTo = "switchto" + ServerFeatureDialout = "dialout" // Features to send to internal clients only. ServerFeatureInternalVirtualSessions = "virtual-sessions" // Possible client features from the "hello" request. ClientFeatureInternalInCall = "internal-incall" + ClientFeatureStartDialout = "start-dialout" ) var ( @@ -460,6 +467,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } DefaultFeaturesInternal = []string{ ServerFeatureInternalVirtualSessions, @@ -468,6 +476,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } DefaultWelcomeFeatures = []string{ ServerFeatureAudioVideoPermissions, @@ -477,6 +486,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } ) @@ -684,6 +694,52 @@ func (m *InCallInternalClientMessage) CheckValid() error { return nil } +type DialoutStatus string + +var ( + DialoutStatusAccepted DialoutStatus = "accepted" + DialoutStatusRinging DialoutStatus = "ringing" + DialoutStatusConnected DialoutStatus = "connected" + DialoutStatusRejected DialoutStatus = "rejected" + DialoutStatusCleared DialoutStatus = "cleared" +) + +type DialoutStatusInternalClientMessage struct { + CallId string `json:"callid"` + Status DialoutStatus `json:"status"` + + // Cause is set if Status is "cleared" or "rejected". + Cause string `json:"cause,omitempty"` + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + +type DialoutInternalClientMessage struct { + Type string `json:"type"` + + RoomId string `json:"roomid,omitempty"` + + Error *Error `json:"error,omitempty"` + + Status *DialoutStatusInternalClientMessage `json:"status,omitempty"` +} + +func (m *DialoutInternalClientMessage) CheckValid() error { + switch m.Type { + case "": + return errors.New("type missing") + case "error": + if m.Error == nil { + return errors.New("error missing") + } + case "status": + if m.Status == nil { + return errors.New("status missing") + } + } + return nil +} + type InternalClientMessage struct { Type string `json:"type"` @@ -694,10 +750,14 @@ type InternalClientMessage struct { RemoveSession *RemoveSessionInternalClientMessage `json:"removesession,omitempty"` InCall *InCallInternalClientMessage `json:"incall,omitempty"` + + Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"` } func (m *InternalClientMessage) CheckValid() error { switch m.Type { + case "": + return errors.New("type missing") case "addsession": if m.AddSession == nil { return fmt.Errorf("addsession missing") @@ -722,10 +782,28 @@ func (m *InternalClientMessage) CheckValid() error { } else if err := m.InCall.CheckValid(); err != nil { return err } + case "dialout": + if m.Dialout == nil { + return fmt.Errorf("dialout missing") + } else if err := m.Dialout.CheckValid(); err != nil { + return err + } } return nil } +type InternalServerDialoutRequest struct { + RoomId string `json:"roomid"` + + Request *BackendRoomDialoutRequest `json:"request"` +} + +type InternalServerMessage struct { + Type string `json:"type"` + + Dialout *InternalServerDialoutRequest `json:"dialout,omitempty"` +} + // Type "event" type RoomEventServerMessage struct { diff --git a/backend_server.go b/backend_server.go index 45a7f6f..84a7faf 100644 --- a/backend_server.go +++ b/backend_server.go @@ -28,6 +28,7 @@ import ( "crypto/sha1" "encoding/base64" "encoding/json" + "errors" "fmt" "io" "log" @@ -37,6 +38,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "time" "github.com/dlintw/goconf" @@ -639,6 +641,115 @@ func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, reques return b.events.PublishBackendRoomMessage(roomid, backend, message) } +type BackendResponseWithStatus interface { + Status() int +} + +type DialoutErrorResponse struct { + BackendServerRoomResponse + + status int +} + +func (r *DialoutErrorResponse) Status() int { + return r.status +} + +func returnDialoutError(status int, err *Error) (any, error) { + response := &DialoutErrorResponse{ + BackendServerRoomResponse: BackendServerRoomResponse{ + Type: "dialout", + Dialout: &BackendRoomDialoutResponse{ + Error: err, + }, + }, + + status: status, + } + return response, nil +} + +func (b *BackendServer) startDialout(roomid string, backend *Backend, request *BackendServerRoomRequest) (any, error) { + if err := request.Dialout.ValidateNumber(); err != nil { + return returnDialoutError(http.StatusBadRequest, err) + } + + // TODO: Add direct lookup of internal sessions that are not in a room and support dialout. + var session *ClientSession + for _, s := range b.hub.sessions { + if s.GetRoom() == nil && s.ClientType() == HelloClientTypeInternal { + clientSession, ok := s.(*ClientSession) + if ok && clientSession.GetClient() != nil && clientSession.HasFeature(ClientFeatureStartDialout) { + session = clientSession + break + } + } + } + if session == nil { + return returnDialoutError(http.StatusNotFound, NewError("no_client_available", "No available client found to trigger dialout.")) + } + + id := newRandomString(32) + msg := &ServerMessage{ + Id: id, + Type: "internal", + Internal: &InternalServerMessage{ + Type: "dialout", + Dialout: &InternalServerDialoutRequest{ + RoomId: roomid, + Request: request.Dialout, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var response atomic.Pointer[DialoutInternalClientMessage] + + session.HandleResponse(id, func(message *ClientMessage) bool { + response.Store(message.Internal.Dialout) + cancel() + // Don't send error to other sessions in the room. + return message.Internal.Dialout.Error != nil + }) + defer session.ClearResponseHandler(id) + + if !session.SendMessage(msg) { + return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "Could not notify about new dialout.")) + } + + <-ctx.Done() + if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { + return returnDialoutError(http.StatusGatewayTimeout, NewError("timeout", "Timeout while waiting for dialout to start.")) + } + + dialout := response.Load() + if dialout == nil { + return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "No dialout response received.")) + } + + switch dialout.Type { + case "error": + return returnDialoutError(http.StatusBadGateway, dialout.Error) + case "status": + if dialout.Status.Status != DialoutStatusAccepted { + log.Printf("Received unsupported dialout status when triggering dialout: %+v", dialout) + return returnDialoutError(http.StatusBadGateway, NewError("unsupported_status", "Unsupported dialout status received.")) + } + + return &BackendServerRoomResponse{ + Type: "dialout", + Dialout: &BackendRoomDialoutResponse{ + CallId: dialout.Status.CallId, + }, + }, nil + } + + log.Printf("Received unsupported dialout type when triggering dialout: %+v", dialout) + return returnDialoutError(http.StatusBadGateway, NewError("unsupported_type", "Unsupported dialout type received.")) +} + func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) { v := mux.Vars(r) roomid := v["roomid"] @@ -692,6 +803,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body request.ReceivedTime = time.Now().UnixNano() + var response any var err error switch request.Type { case "invite": @@ -722,6 +834,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body err = b.sendRoomMessage(roomid, backend, &request) case "switchto": err = b.sendRoomSwitchTo(roomid, backend, &request) + case "dialout": + response, err = b.startDialout(roomid, backend, &request) default: http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest) return @@ -733,11 +847,27 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body return } + var responseData []byte + responseStatus := http.StatusOK + if response == nil { + // TODO(jojo): Return better response struct. + responseData = []byte("{}") + } else { + if s, ok := response.(BackendResponseWithStatus); ok { + responseStatus = s.Status() + } + responseData, err = json.Marshal(response) + if err != nil { + log.Printf("Could not serialize backend response %+v: %s", response, err) + responseStatus = http.StatusInternalServerError + responseData = []byte("{\"error\":\"could_not_serialize\"}") + } + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") - w.WriteHeader(http.StatusOK) - // TODO(jojo): Return better response struct. - w.Write([]byte("{}")) // nolint + w.WriteHeader(responseStatus) + w.Write(responseData) // nolint } func (b *BackendServer) allowStatsAccess(r *http.Request) bool { diff --git a/clientsession.go b/clientsession.go index e0243b0..15ff832 100644 --- a/clientsession.go +++ b/clientsession.go @@ -46,6 +46,9 @@ var ( PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend" ) +// ResponseHandlerFunc will return "true" has been fully processed. +type ResponseHandlerFunc func(message *ClientMessage) bool + type ClientSession struct { roomJoinTime int64 inCall uint32 @@ -89,6 +92,9 @@ type ClientSession struct { seenJoinedLock sync.Mutex seenJoinedEvents map[string]bool + + responseHandlersLock sync.Mutex + responseHandlers map[string]ResponseHandlerFunc } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -1420,3 +1426,38 @@ func (s *ClientSession) GetVirtualSessions() []*VirtualSession { } return result } + +func (s *ClientSession) HandleResponse(id string, handler ResponseHandlerFunc) { + s.responseHandlersLock.Lock() + defer s.responseHandlersLock.Unlock() + + if s.responseHandlers == nil { + s.responseHandlers = make(map[string]ResponseHandlerFunc) + } + + s.responseHandlers[id] = handler +} + +func (s *ClientSession) ClearResponseHandler(id string) { + s.responseHandlersLock.Lock() + defer s.responseHandlersLock.Unlock() + + delete(s.responseHandlers, id) +} + +func (s *ClientSession) ProcessResponse(message *ClientMessage) bool { + id := message.Id + if id == "" { + return false + } + + s.responseHandlersLock.Lock() + cb, found := s.responseHandlers[id] + defer s.responseHandlersLock.Unlock() + + if !found { + return false + } + + return cb(message) +} diff --git a/hub.go b/hub.go index f7ef8fb..fcbea99 100644 --- a/hub.go +++ b/hub.go @@ -1772,6 +1772,10 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { return } + if session.ProcessResponse(message) { + return + } + switch msg.Type { case "addsession": msg := msg.AddSession @@ -1923,6 +1927,18 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { room.NotifySessionChanged(session, SessionChangeInCall) } } + case "dialout": + roomId := msg.Dialout.RoomId + msg.Dialout.RoomId = "" // Don't send room id to recipients. + if err := h.events.PublishRoomMessage(roomId, session.Backend(), &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "dialout", + Dialout: msg.Dialout, + }, + }); err != nil { + log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId) + } default: log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId()) return