diff --git a/api_async.go b/api_async.go index a78adff..d3c0426 100644 --- a/api_async.go +++ b/api_async.go @@ -21,7 +21,11 @@ */ package signaling -import "time" +import ( + "encoding/json" + "fmt" + "time" +) type AsyncMessage struct { SendTime time.Time `json:"sendtime"` @@ -41,6 +45,14 @@ type AsyncMessage struct { Id string `json:"id"` } +func (m *AsyncMessage) String() string { + data, err := json.Marshal(m) + if err != nil { + return fmt.Sprintf("Could not serialize %#v: %s", m, err) + } + return string(data) +} + type AsyncRoomMessage struct { Type string `json:"type"` diff --git a/api_backend.go b/api_backend.go index 67f7a15..e957a3c 100644 --- a/api_backend.go +++ b/api_backend.go @@ -31,7 +31,9 @@ import ( "fmt" "net/http" "net/url" + "regexp" "strings" + "time" ) const ( @@ -104,6 +106,10 @@ type BackendServerRoomRequest struct { SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"` + Dialout *BackendRoomDialoutRequest `json:"dialout,omitempty"` + + Transient *BackendRoomTransientRequest `json:"transient,omitempty"` + // Internal properties ReceivedTime int64 `json:"received,omitempty"` } @@ -170,6 +176,64 @@ type BackendRoomSwitchToMessageRequest struct { SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"` } +type BackendRoomDialoutRequest struct { + // E.164 number to dial (e.g. "+1234567890") + Number string `json:"number"` + + Options json.RawMessage `json:"options,omitempty"` +} + +var ( + checkE164Number = regexp.MustCompile(`^\+\d{2,}$`) +) + +func isValidNumber(s string) bool { + return checkE164Number.MatchString(s) +} + +func (r *BackendRoomDialoutRequest) ValidateNumber() *Error { + if r.Number == "" { + return NewError("number_missing", "No number provided") + } + + if !isValidNumber(r.Number) { + return NewError("invalid_number", "Expected E.164 number.") + } + + return nil +} + +type TransientAction string + +const ( + TransientActionSet TransientAction = "set" + TransientActionDelete TransientAction = "delete" +) + +type BackendRoomTransientRequest struct { + Action TransientAction `json:"action"` + Key string `json:"key"` + Value interface{} `json:"value,omitempty"` + TTL time.Duration `json:"ttl,omitempty"` +} + +type BackendServerRoomResponse struct { + Type string `json:"type"` + + Dialout *BackendRoomDialoutResponse `json:"dialout,omitempty"` +} + +type BackendRoomDialoutError struct { + Code string `json:"code"` + Message string `json:"message,omitempty"` +} + +type BackendRoomDialoutResponse struct { + CallId string `json:"callid,omitempty"` + + Error *Error `json:"error,omitempty"` +} + // Requests from the signaling server to the Nextcloud backend. type BackendClientAuthRequest struct { diff --git a/api_backend_test.go b/api_backend_test.go index cc50e8f..48f5e41 100644 --- a/api_backend_test.go +++ b/api_backend_test.go @@ -56,3 +56,27 @@ func TestBackendChecksum(t *testing.T) { t.Errorf("Checksum %s could not be validated from request", check1) } } + +func TestValidNumbers(t *testing.T) { + valid := []string{ + "+12", + "+12345", + } + invalid := []string{ + "+1", + "12345", + " +12345", + " +12345 ", + "+123-45", + } + for _, number := range valid { + if !isValidNumber(number) { + t.Errorf("number %s should be valid", number) + } + } + for _, number := range invalid { + if isValidNumber(number) { + t.Errorf("number %s should not be valid", number) + } + } +} diff --git a/api_signaling.go b/api_signaling.go index db4036b..edd1fd2 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -23,6 +23,7 @@ package signaling import ( "encoding/json" + "errors" "fmt" "log" "net/url" @@ -164,6 +165,10 @@ type ServerMessage struct { Event *EventServerMessage `json:"event,omitempty"` TransientData *TransientDataServerMessage `json:"transient,omitempty"` + + Internal *InternalServerMessage `json:"internal,omitempty"` + + Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"` } func (r *ServerMessage) CloseAfterSend(session Session) bool { @@ -444,12 +449,14 @@ const ( ServerFeatureWelcome = "welcome" ServerFeatureHelloV2 = "hello-v2" ServerFeatureSwitchTo = "switchto" + ServerFeatureDialout = "dialout" // Features to send to internal clients only. ServerFeatureInternalVirtualSessions = "virtual-sessions" // Possible client features from the "hello" request. ClientFeatureInternalInCall = "internal-incall" + ClientFeatureStartDialout = "start-dialout" ) var ( @@ -460,6 +467,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } DefaultFeaturesInternal = []string{ ServerFeatureInternalVirtualSessions, @@ -468,6 +476,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } DefaultWelcomeFeatures = []string{ ServerFeatureAudioVideoPermissions, @@ -477,6 +486,7 @@ var ( ServerFeatureWelcome, ServerFeatureHelloV2, ServerFeatureSwitchTo, + ServerFeatureDialout, } ) @@ -684,6 +694,52 @@ func (m *InCallInternalClientMessage) CheckValid() error { return nil } +type DialoutStatus string + +var ( + DialoutStatusAccepted DialoutStatus = "accepted" + DialoutStatusRinging DialoutStatus = "ringing" + DialoutStatusConnected DialoutStatus = "connected" + DialoutStatusRejected DialoutStatus = "rejected" + DialoutStatusCleared DialoutStatus = "cleared" +) + +type DialoutStatusInternalClientMessage struct { + CallId string `json:"callid"` + Status DialoutStatus `json:"status"` + + // Cause is set if Status is "cleared" or "rejected". + Cause string `json:"cause,omitempty"` + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + +type DialoutInternalClientMessage struct { + Type string `json:"type"` + + RoomId string `json:"roomid,omitempty"` + + Error *Error `json:"error,omitempty"` + + Status *DialoutStatusInternalClientMessage `json:"status,omitempty"` +} + +func (m *DialoutInternalClientMessage) CheckValid() error { + switch m.Type { + case "": + return errors.New("type missing") + case "error": + if m.Error == nil { + return errors.New("error missing") + } + case "status": + if m.Status == nil { + return errors.New("status missing") + } + } + return nil +} + type InternalClientMessage struct { Type string `json:"type"` @@ -694,10 +750,14 @@ type InternalClientMessage struct { RemoveSession *RemoveSessionInternalClientMessage `json:"removesession,omitempty"` InCall *InCallInternalClientMessage `json:"incall,omitempty"` + + Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"` } func (m *InternalClientMessage) CheckValid() error { switch m.Type { + case "": + return errors.New("type missing") case "addsession": if m.AddSession == nil { return fmt.Errorf("addsession missing") @@ -722,10 +782,28 @@ func (m *InternalClientMessage) CheckValid() error { } else if err := m.InCall.CheckValid(); err != nil { return err } + case "dialout": + if m.Dialout == nil { + return fmt.Errorf("dialout missing") + } else if err := m.Dialout.CheckValid(); err != nil { + return err + } } return nil } +type InternalServerDialoutRequest struct { + RoomId string `json:"roomid"` + + Request *BackendRoomDialoutRequest `json:"request"` +} + +type InternalServerMessage struct { + Type string `json:"type"` + + Dialout *InternalServerDialoutRequest `json:"dialout,omitempty"` +} + // Type "event" type RoomEventServerMessage struct { diff --git a/backend_server.go b/backend_server.go index 45a7f6f..1705f8e 100644 --- a/backend_server.go +++ b/backend_server.go @@ -28,6 +28,7 @@ import ( "crypto/sha1" "encoding/base64" "encoding/json" + "errors" "fmt" "io" "log" @@ -35,8 +36,10 @@ import ( "net/http" "net/url" "reflect" + "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/dlintw/goconf" @@ -639,6 +642,121 @@ func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, reques return b.events.PublishBackendRoomMessage(roomid, backend, message) } +type BackendResponseWithStatus interface { + Status() int +} + +type DialoutErrorResponse struct { + BackendServerRoomResponse + + status int +} + +func (r *DialoutErrorResponse) Status() int { + return r.status +} + +func returnDialoutError(status int, err *Error) (any, error) { + response := &DialoutErrorResponse{ + BackendServerRoomResponse: BackendServerRoomResponse{ + Type: "dialout", + Dialout: &BackendRoomDialoutResponse{ + Error: err, + }, + }, + + status: status, + } + return response, nil +} + +var checkNumeric = regexp.MustCompile(`^[0-9]+$`) + +func isNumeric(s string) bool { + return checkNumeric.MatchString(s) +} + +func (b *BackendServer) startDialout(roomid string, backend *Backend, request *BackendServerRoomRequest) (any, error) { + if err := request.Dialout.ValidateNumber(); err != nil { + return returnDialoutError(http.StatusBadRequest, err) + } + + if !isNumeric(roomid) { + return returnDialoutError(http.StatusBadRequest, NewError("invalid_roomid", "The room id must be numeric.")) + } + + var session *ClientSession + for s := range b.hub.dialoutSessions { + if s.GetClient() != nil { + session = s + break + } + } + if session == nil { + return returnDialoutError(http.StatusNotFound, NewError("no_client_available", "No available client found to trigger dialout.")) + } + + id := newRandomString(32) + msg := &ServerMessage{ + Id: id, + Type: "internal", + Internal: &InternalServerMessage{ + Type: "dialout", + Dialout: &InternalServerDialoutRequest{ + RoomId: roomid, + Request: request.Dialout, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var response atomic.Pointer[DialoutInternalClientMessage] + + session.HandleResponse(id, func(message *ClientMessage) bool { + response.Store(message.Internal.Dialout) + cancel() + // Don't send error to other sessions in the room. + return message.Internal.Dialout.Error != nil + }) + defer session.ClearResponseHandler(id) + + if !session.SendMessage(msg) { + return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "Could not notify about new dialout.")) + } + + <-ctx.Done() + if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { + return returnDialoutError(http.StatusGatewayTimeout, NewError("timeout", "Timeout while waiting for dialout to start.")) + } + + dialout := response.Load() + if dialout == nil { + return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "No dialout response received.")) + } + + switch dialout.Type { + case "error": + return returnDialoutError(http.StatusBadGateway, dialout.Error) + case "status": + if dialout.Status.Status != DialoutStatusAccepted { + log.Printf("Received unsupported dialout status when triggering dialout: %+v", dialout) + return returnDialoutError(http.StatusBadGateway, NewError("unsupported_status", "Unsupported dialout status received.")) + } + + return &BackendServerRoomResponse{ + Type: "dialout", + Dialout: &BackendRoomDialoutResponse{ + CallId: dialout.Status.CallId, + }, + }, nil + } + + log.Printf("Received unsupported dialout type when triggering dialout: %+v", dialout) + return returnDialoutError(http.StatusBadGateway, NewError("unsupported_type", "Unsupported dialout type received.")) +} + func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) { v := mux.Vars(r) roomid := v["roomid"] @@ -692,6 +810,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body request.ReceivedTime = time.Now().UnixNano() + var response any var err error switch request.Type { case "invite": @@ -722,6 +841,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body err = b.sendRoomMessage(roomid, backend, &request) case "switchto": err = b.sendRoomSwitchTo(roomid, backend, &request) + case "dialout": + response, err = b.startDialout(roomid, backend, &request) default: http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest) return @@ -733,11 +854,27 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body return } + var responseData []byte + responseStatus := http.StatusOK + if response == nil { + // TODO(jojo): Return better response struct. + responseData = []byte("{}") + } else { + if s, ok := response.(BackendResponseWithStatus); ok { + responseStatus = s.Status() + } + responseData, err = json.Marshal(response) + if err != nil { + log.Printf("Could not serialize backend response %+v: %s", response, err) + responseStatus = http.StatusInternalServerError + responseData = []byte("{\"error\":\"could_not_serialize\"}") + } + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") - w.WriteHeader(http.StatusOK) - // TODO(jojo): Return better response struct. - w.Write([]byte("{}")) // nolint + w.WriteHeader(responseStatus) + w.Write(responseData) // nolint } func (b *BackendServer) allowStatsAccess(r *http.Request) bool { diff --git a/backend_server_test.go b/backend_server_test.go index e9d25bd..a950b5c 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -1760,3 +1760,296 @@ func TestBackendServer_StatsAllowedIps(t *testing.T) { }) } } + +func Test_IsNumeric(t *testing.T) { + numeric := []string{ + "0", + "1", + "12345", + } + nonNumeric := []string{ + "", + " ", + " 0", + "0 ", + " 0 ", + "-1", + "1.2", + "1a", + "a1", + } + for _, s := range numeric { + if !isNumeric(s) { + t.Errorf("%s should be numeric", s) + } + } + for _, s := range nonNumeric { + if isNumeric(s) { + t.Errorf("%s should not be numeric", s) + } + } +} + +func TestBackendServer_DialoutNoSipBridge(t *testing.T) { + _, _, _, hub, _, server := CreateBackendServerForTest(t) + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + if err := client.SendHelloInternal(); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + _, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomId := "12345" + msg := &BackendServerRoomRequest{ + Type: "dialout", + Dialout: &BackendRoomDialoutRequest{ + Number: "+1234567890", + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != http.StatusNotFound { + t.Fatalf("Expected error %d, got %s: %s", http.StatusNotFound, res.Status, string(body)) + } + + var response BackendServerRoomResponse + if err := json.Unmarshal(body, &response); err != nil { + t.Fatal(err) + } + + if response.Type != "dialout" || response.Dialout == nil { + t.Fatalf("expected type dialout, got %s", string(body)) + } + if response.Dialout.Error == nil { + t.Fatalf("expected dialout error, got %s", string(body)) + } + if expected := "no_client_available"; response.Dialout.Error.Code != expected { + t.Errorf("expected error code %s, got %s", expected, string(body)) + } +} + +func TestBackendServer_DialoutAccepted(t *testing.T) { + _, _, _, hub, _, server := CreateBackendServerForTest(t) + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + if err := client.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + _, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomId := "12345" + callId := "call-123" + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + + msg, err := client.RunUntilMessage(ctx) + if err != nil { + t.Error(err) + return + } + + if msg.Type != "internal" || msg.Internal.Type != "dialout" { + t.Errorf("expected internal dialout message, got %+v", msg) + return + } + + if msg.Internal.Dialout.RoomId != roomId { + t.Errorf("expected room id %s, got %+v", roomId, msg) + } + + response := &ClientMessage{ + Id: msg.Id, + Type: "internal", + Internal: &InternalClientMessage{ + Type: "dialout", + Dialout: &DialoutInternalClientMessage{ + Type: "status", + RoomId: msg.Internal.Dialout.RoomId, + Status: &DialoutStatusInternalClientMessage{ + Status: "accepted", + CallId: callId, + }, + }, + }, + } + if err := client.WriteJSON(response); err != nil { + t.Error(err) + } + }() + + defer func() { + <-stopped + }() + + msg := &BackendServerRoomRequest{ + Type: "dialout", + Dialout: &BackendRoomDialoutRequest{ + Number: "+1234567890", + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != http.StatusOK { + t.Fatalf("Expected error %d, got %s: %s", http.StatusOK, res.Status, string(body)) + } + + var response BackendServerRoomResponse + if err := json.Unmarshal(body, &response); err != nil { + t.Fatal(err) + } + + if response.Type != "dialout" || response.Dialout == nil { + t.Fatalf("expected type dialout, got %s", string(body)) + } + if response.Dialout.Error != nil { + t.Fatalf("expected dialout success, got %s", string(body)) + } + if response.Dialout.CallId != callId { + t.Errorf("expected call id %s, got %s", callId, string(body)) + } +} + +func TestBackendServer_DialoutRejected(t *testing.T) { + _, _, _, hub, _, server := CreateBackendServerForTest(t) + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + if err := client.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + _, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomId := "12345" + errorCode := "error-code" + errorMessage := "rejected call" + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + + msg, err := client.RunUntilMessage(ctx) + if err != nil { + t.Error(err) + return + } + + if msg.Type != "internal" || msg.Internal.Type != "dialout" { + t.Errorf("expected internal dialout message, got %+v", msg) + return + } + + if msg.Internal.Dialout.RoomId != roomId { + t.Errorf("expected room id %s, got %+v", roomId, msg) + } + + response := &ClientMessage{ + Id: msg.Id, + Type: "internal", + Internal: &InternalClientMessage{ + Type: "dialout", + Dialout: &DialoutInternalClientMessage{ + Type: "error", + Error: NewError(errorCode, errorMessage), + }, + }, + } + if err := client.WriteJSON(response); err != nil { + t.Error(err) + } + }() + + defer func() { + <-stopped + }() + + msg := &BackendServerRoomRequest{ + Type: "dialout", + Dialout: &BackendRoomDialoutRequest{ + Number: "+1234567890", + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != http.StatusBadGateway { + t.Fatalf("Expected error %d, got %s: %s", http.StatusBadGateway, res.Status, string(body)) + } + + var response BackendServerRoomResponse + if err := json.Unmarshal(body, &response); err != nil { + t.Fatal(err) + } + + if response.Type != "dialout" || response.Dialout == nil { + t.Fatalf("expected type dialout, got %s", string(body)) + } + if response.Dialout.Error == nil { + t.Fatalf("expected dialout error, got %s", string(body)) + } + if response.Dialout.Error.Code != errorCode { + t.Errorf("expected error code %s, got %s", errorCode, string(body)) + } + if response.Dialout.Error.Message != errorMessage { + t.Errorf("expected error message %s, got %s", errorMessage, string(body)) + } +} diff --git a/clientsession.go b/clientsession.go index e0243b0..15ff832 100644 --- a/clientsession.go +++ b/clientsession.go @@ -46,6 +46,9 @@ var ( PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend" ) +// ResponseHandlerFunc will return "true" has been fully processed. +type ResponseHandlerFunc func(message *ClientMessage) bool + type ClientSession struct { roomJoinTime int64 inCall uint32 @@ -89,6 +92,9 @@ type ClientSession struct { seenJoinedLock sync.Mutex seenJoinedEvents map[string]bool + + responseHandlersLock sync.Mutex + responseHandlers map[string]ResponseHandlerFunc } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -1420,3 +1426,38 @@ func (s *ClientSession) GetVirtualSessions() []*VirtualSession { } return result } + +func (s *ClientSession) HandleResponse(id string, handler ResponseHandlerFunc) { + s.responseHandlersLock.Lock() + defer s.responseHandlersLock.Unlock() + + if s.responseHandlers == nil { + s.responseHandlers = make(map[string]ResponseHandlerFunc) + } + + s.responseHandlers[id] = handler +} + +func (s *ClientSession) ClearResponseHandler(id string) { + s.responseHandlersLock.Lock() + defer s.responseHandlersLock.Unlock() + + delete(s.responseHandlers, id) +} + +func (s *ClientSession) ProcessResponse(message *ClientMessage) bool { + id := message.Id + if id == "" { + return false + } + + s.responseHandlersLock.Lock() + cb, found := s.responseHandlers[id] + defer s.responseHandlersLock.Unlock() + + if !found { + return false + } + + return cb(message) +} diff --git a/docs/standalone-signaling-api-v1.md b/docs/standalone-signaling-api-v1.md index 914cf7d..d05ae68 100644 --- a/docs/standalone-signaling-api-v1.md +++ b/docs/standalone-signaling-api-v1.md @@ -793,6 +793,74 @@ Message format (Server -> Client, receive message) - The `userid` is omitted if a message was sent by an anonymous user. +## Control messages + +Similar to regular messages between clients which can be sent by any session, +messages with type `control` can only be sent if the permission flag `control` +is available. + +These messages can be used to perform actions on clients that should only be +possible by some users (e.g. moderators). + +Message format (Client -> Server, mute phone): + + { + "id": "unique-request-id", + "type": "control", + "control": { + "recipient": { + "type": "session", + "sessionid": "the-session-id-to-send-to" + }, + "data": { + "type": "mute", + "audio": "audio-flags" + } + } + } + +The bit-field `audio-flags` supports the following bits: +- `1`: mute speaking (i.e. phone can no longer talk) +- `2`: mute listening (i.e. phone is on hold and can no longer hear) + +To unmute, a value of `0` must be sent. + +Message format (Client -> Server, hangup phone): + + { + "id": "unique-request-id", + "type": "control", + "control": { + "recipient": { + "type": "session", + "sessionid": "the-session-id-to-send-to" + }, + "data": { + "type": "hangup" + } + } + } + +Message format (Client -> Server, send DTMF): + + { + "id": "unique-request-id", + "type": "control", + "control": { + "recipient": { + "type": "session", + "sessionid": "the-session-id-to-send-to" + }, + "data": { + "type": "dtmf", + "digit": "the-digit" + } + } + } + +Supported digits are `0`-`9`, `*` and `#`. + + ## Transient data Transient data can be used to share data in a room that is valid while sessions @@ -936,6 +1004,13 @@ Message format (Client -> Server): } +Phone sessions will have `type` set to `phone` in the additional user data +(which will be included in the `joined` [room event](#room-events)), +`callid` will be the id of the phone call and `number` the target of the call. +The call id will match the one returned for accepted outgoing calls and the +associated session id can be used to hangup a call or send DTMF tones to it. + + ### Update virtual session Message format (Client -> Server): @@ -1207,3 +1282,49 @@ Message format (Server -> Client): Clients are expected to follow the `switchto` message. If clients don't switch to the target room after some time, they might get disconnected. + + +### Start dialout from a room + +Use this to start a phone dialout to a new user in a given room. + +Message format (Backend -> Server) + + { + "type": "dialout" + "dialout" { + "number": "e164-target-number", + "options": { + ...arbitrary options that will be sent back to validate... + } + } + } + +Please note that this requires a connected internal client that supports +dialout (e.g. the SIP bridge). + +Message format (Server -> Backend, request was accepted) + + { + "type": "dialout" + "dialout" { + "callid": "the-unique-call-id" + } + } + +Message format (Server -> Backend, request could not be processed) + + { + "type": "dialout" + "dialout" { + "error": { + "code": "the-internal-message-id", + "message": "human-readable-error-message", + "details": { + ...optional additional details... + } + } + } + } + +A HTTP error status code will be set in this case. diff --git a/hub.go b/hub.go index f7ef8fb..bbcfc98 100644 --- a/hub.go +++ b/hub.go @@ -97,6 +97,9 @@ var ( // Delay after which a screen publisher should be cleaned up. cleanupScreenPublisherDelay = time.Second + + // Delay after which a "cleared" / "rejected" dialout status should be removed. + removeCallStatusTTL = 5 * time.Second ) const ( @@ -150,6 +153,7 @@ type Hub struct { expiredSessions map[Session]bool anonymousSessions map[*ClientSession]time.Time expectHelloClients map[*Client]time.Time + dialoutSessions map[*ClientSession]bool backendTimeout time.Duration backend *BackendClient @@ -338,6 +342,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer expiredSessions: make(map[Session]bool), anonymousSessions: make(map[*ClientSession]time.Time), expectHelloClients: make(map[*Client]time.Time), + dialoutSessions: make(map[*ClientSession]bool), backendTimeout: backendTimeout, backend: backend, @@ -641,6 +646,7 @@ func (h *Hub) removeSession(session Session) (removed bool) { delete(h.expiredSessions, session) if session, ok := session.(*ClientSession); ok { delete(h.anonymousSessions, session) + delete(h.dialoutSessions, session) } h.mu.Unlock() return @@ -802,8 +808,12 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B h.sessions[sessionIdData.Sid] = session h.clients[sessionIdData.Sid] = client delete(h.expectHelloClients, client) - if userId == "" && auth.Type != HelloClientTypeInternal { + if userId == "" && session.ClientType() != HelloClientTypeInternal { h.startWaitAnonymousSessionRoomLocked(session) + } else if session.ClientType() == HelloClientTypeInternal && session.HasFeature(ClientFeatureStartDialout) { + // TODO: There is a small race condition for sessions that take some time + // between connecting and joining a room. + h.dialoutSessions[session] = true } h.mu.Unlock() @@ -1250,6 +1260,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { h.startWaitAnonymousSessionRoom(session) } } + return } @@ -1389,6 +1400,10 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro h.mu.Lock() // The session now joined a room, don't expire if it is anonymous. delete(h.anonymousSessions, session) + if session.ClientType() == HelloClientTypeInternal && session.HasFeature(ClientFeatureStartDialout) { + // An internal session in a room can not be used for dialout. + delete(h.dialoutSessions, session) + } h.mu.Unlock() session.SetRoom(r) if room.Room.Permissions != nil { @@ -1772,6 +1787,10 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { return } + if session.ProcessResponse(message) { + return + } + switch msg.Type { case "addsession": msg := msg.AddSession @@ -1923,6 +1942,38 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { room.NotifySessionChanged(session, SessionChangeInCall) } } + case "dialout": + roomId := msg.Dialout.RoomId + msg.Dialout.RoomId = "" // Don't send room id to recipients. + if msg.Dialout.Type == "status" { + asyncMessage := &AsyncMessage{ + Type: "room", + Room: &BackendServerRoomRequest{ + Type: "transient", + Transient: &BackendRoomTransientRequest{ + Action: TransientActionSet, + Key: "callstatus_" + msg.Dialout.Status.CallId, + Value: msg.Dialout.Status, + }, + }, + } + if msg.Dialout.Status.Status == DialoutStatusCleared || msg.Dialout.Status.Status == DialoutStatusRejected { + asyncMessage.Room.Transient.TTL = removeCallStatusTTL + } + if err := h.events.PublishBackendRoomMessage(roomId, session.Backend(), asyncMessage); err != nil { + log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId) + } + } else { + if err := h.events.PublishRoomMessage(roomId, session.Backend(), &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "dialout", + Dialout: msg.Dialout, + }, + }); err != nil { + log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId) + } + } default: log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId()) return diff --git a/hub_test.go b/hub_test.go index 9bfa242..90b5f93 100644 --- a/hub_test.go +++ b/hub_test.go @@ -5349,3 +5349,214 @@ func TestGeoipOverrides(t *testing.T) { t.Errorf("expected country %s, got %s", strings.ToUpper(country3), country) } } + +func TestDialoutStatus(t *testing.T) { + _, _, _, hub, _, server := CreateBackendServerForTest(t) + + internalClient := NewTestClient(t, server, hub) + defer internalClient.CloseWithBye() + if err := internalClient.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + _, err := internalClient.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomId := "12345" + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + if err := client.SendHello(testDefaultUserId); err != nil { + t.Fatal(err) + } + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + if _, err := client.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } + + if err := client.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + callId := "call-123" + + stopped := make(chan struct{}) + go func(client *TestClient) { + defer close(stopped) + + msg, err := client.RunUntilMessage(ctx) + if err != nil { + t.Error(err) + return + } + + if msg.Type != "internal" || msg.Internal.Type != "dialout" { + t.Errorf("expected internal dialout message, got %+v", msg) + return + } + + if msg.Internal.Dialout.RoomId != roomId { + t.Errorf("expected room id %s, got %+v", roomId, msg) + } + + response := &ClientMessage{ + Id: msg.Id, + Type: "internal", + Internal: &InternalClientMessage{ + Type: "dialout", + Dialout: &DialoutInternalClientMessage{ + Type: "status", + RoomId: msg.Internal.Dialout.RoomId, + Status: &DialoutStatusInternalClientMessage{ + Status: "accepted", + CallId: callId, + }, + }, + }, + } + if err := client.WriteJSON(response); err != nil { + t.Error(err) + } + }(internalClient) + + defer func() { + <-stopped + }() + + msg := &BackendServerRoomRequest{ + Type: "dialout", + Dialout: &BackendRoomDialoutRequest{ + Number: "+1234567890", + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != http.StatusOK { + t.Fatalf("Expected error %d, got %s: %s", http.StatusOK, res.Status, string(body)) + } + + var response BackendServerRoomResponse + if err := json.Unmarshal(body, &response); err != nil { + t.Fatal(err) + } + + if response.Type != "dialout" || response.Dialout == nil { + t.Fatalf("expected type dialout, got %s", string(body)) + } + if response.Dialout.Error != nil { + t.Fatalf("expected dialout success, got %s", string(body)) + } + if response.Dialout.CallId != callId { + t.Errorf("expected call id %s, got %s", callId, string(body)) + } + + key := "callstatus_" + callId + if msg, err := client.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageTransientSet(msg, key, map[string]interface{}{ + "callid": callId, + "status": "accepted", + }, nil); err != nil { + t.Error(err) + } + } + + if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{ + RoomId: roomId, + Type: "status", + Status: &DialoutStatusInternalClientMessage{ + CallId: callId, + Status: "ringing", + }, + }); err != nil { + t.Fatal(err) + } + + if msg, err := client.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageTransientSet(msg, key, map[string]interface{}{ + "callid": callId, + "status": "ringing", + }, + map[string]interface{}{ + "callid": callId, + "status": "accepted", + }); err != nil { + t.Error(err) + } + } + + old := removeCallStatusTTL + defer func() { + removeCallStatusTTL = old + }() + removeCallStatusTTL = 500 * time.Millisecond + + clearedCause := "cleared-call" + if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{ + RoomId: roomId, + Type: "status", + Status: &DialoutStatusInternalClientMessage{ + CallId: callId, + Status: "cleared", + Cause: clearedCause, + }, + }); err != nil { + t.Fatal(err) + } + + if msg, err := client.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageTransientSet(msg, key, map[string]interface{}{ + "callid": callId, + "status": "cleared", + "cause": clearedCause, + }, + map[string]interface{}{ + "callid": callId, + "status": "ringing", + }); err != nil { + t.Error(err) + } + } + + ctx2, cancel := context.WithTimeout(ctx, removeCallStatusTTL*2) + defer cancel() + + if msg, err := client.RunUntilMessage(ctx2); err != nil { + t.Fatal(err) + } else { + if err := checkMessageTransientRemove(msg, key, map[string]interface{}{ + "callid": callId, + "status": "cleared", + "cause": clearedCause, + }); err != nil { + t.Error(err) + } + } +} diff --git a/room.go b/room.go index e0a533a..8d983ef 100644 --- a/room.go +++ b/room.go @@ -244,6 +244,15 @@ func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest) r.publishRoomMessage(message.Message) case "switchto": r.publishSwitchTo(message.SwitchTo) + case "transient": + switch message.Transient.Action { + case TransientActionSet: + r.SetTransientDataTTL(message.Transient.Key, message.Transient.Value, message.Transient.TTL) + case TransientActionDelete: + r.RemoveTransientData(message.Transient.Key) + default: + log.Printf("Unsupported transient action in room %s: %+v", r.Id(), message.Transient) + } default: log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.Id(), message) } diff --git a/testclient_test.go b/testclient_test.go index acb64e6..e2b1106 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -34,6 +34,7 @@ import ( "reflect" "strconv" "strings" + "sync" "testing" "time" @@ -211,6 +212,7 @@ type TestClient struct { hub *Hub server *httptest.Server + mu sync.Mutex conn *websocket.Conn localAddr net.Addr @@ -280,6 +282,8 @@ func (c *TestClient) CloseWithBye() { } func (c *TestClient) Close() { + c.mu.Lock() + defer c.mu.Unlock() if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err == websocket.ErrCloseSent { // Already closed return @@ -368,6 +372,8 @@ func (c *TestClient) WriteJSON(data interface{}) error { } } + c.mu.Lock() + defer c.mu.Unlock() return c.conn.WriteJSON(data) } @@ -578,6 +584,18 @@ func (c *TestClient) SendInternalRemoveSession(msg *RemoveSessionInternalClientM return c.WriteJSON(message) } +func (c *TestClient) SendInternalDialout(msg *DialoutInternalClientMessage) error { + message := &ClientMessage{ + Id: "abcd", + Type: "internal", + Internal: &InternalClientMessage{ + Type: "dialout", + Dialout: msg, + }, + } + return c.WriteJSON(message) +} + func (c *TestClient) SetTransientData(key string, value interface{}, ttl time.Duration) error { payload, err := json.Marshal(value) if err != nil {