Merge pull request #563 from strukturag/dialout-support

Implement message handler for dialout support.
This commit is contained in:
Joachim Bauch 2023-10-26 13:30:45 +02:00 committed by GitHub
commit 0936d40f8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1064 additions and 5 deletions

View file

@ -21,7 +21,11 @@
*/
package signaling
import "time"
import (
"encoding/json"
"fmt"
"time"
)
type AsyncMessage struct {
SendTime time.Time `json:"sendtime"`
@ -41,6 +45,14 @@ type AsyncMessage struct {
Id string `json:"id"`
}
func (m *AsyncMessage) String() string {
data, err := json.Marshal(m)
if err != nil {
return fmt.Sprintf("Could not serialize %#v: %s", m, err)
}
return string(data)
}
type AsyncRoomMessage struct {
Type string `json:"type"`

View file

@ -31,7 +31,9 @@ import (
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"time"
)
const (
@ -104,6 +106,10 @@ type BackendServerRoomRequest struct {
SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"`
Dialout *BackendRoomDialoutRequest `json:"dialout,omitempty"`
Transient *BackendRoomTransientRequest `json:"transient,omitempty"`
// Internal properties
ReceivedTime int64 `json:"received,omitempty"`
}
@ -170,6 +176,64 @@ type BackendRoomSwitchToMessageRequest struct {
SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"`
}
type BackendRoomDialoutRequest struct {
// E.164 number to dial (e.g. "+1234567890")
Number string `json:"number"`
Options json.RawMessage `json:"options,omitempty"`
}
var (
checkE164Number = regexp.MustCompile(`^\+\d{2,}$`)
)
func isValidNumber(s string) bool {
return checkE164Number.MatchString(s)
}
func (r *BackendRoomDialoutRequest) ValidateNumber() *Error {
if r.Number == "" {
return NewError("number_missing", "No number provided")
}
if !isValidNumber(r.Number) {
return NewError("invalid_number", "Expected E.164 number.")
}
return nil
}
type TransientAction string
const (
TransientActionSet TransientAction = "set"
TransientActionDelete TransientAction = "delete"
)
type BackendRoomTransientRequest struct {
Action TransientAction `json:"action"`
Key string `json:"key"`
Value interface{} `json:"value,omitempty"`
TTL time.Duration `json:"ttl,omitempty"`
}
type BackendServerRoomResponse struct {
Type string `json:"type"`
Dialout *BackendRoomDialoutResponse `json:"dialout,omitempty"`
}
type BackendRoomDialoutError struct {
Code string `json:"code"`
Message string `json:"message,omitempty"`
}
type BackendRoomDialoutResponse struct {
CallId string `json:"callid,omitempty"`
Error *Error `json:"error,omitempty"`
}
// Requests from the signaling server to the Nextcloud backend.
type BackendClientAuthRequest struct {

View file

@ -56,3 +56,27 @@ func TestBackendChecksum(t *testing.T) {
t.Errorf("Checksum %s could not be validated from request", check1)
}
}
func TestValidNumbers(t *testing.T) {
valid := []string{
"+12",
"+12345",
}
invalid := []string{
"+1",
"12345",
" +12345",
" +12345 ",
"+123-45",
}
for _, number := range valid {
if !isValidNumber(number) {
t.Errorf("number %s should be valid", number)
}
}
for _, number := range invalid {
if isValidNumber(number) {
t.Errorf("number %s should not be valid", number)
}
}
}

View file

@ -23,6 +23,7 @@ package signaling
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
@ -164,6 +165,10 @@ type ServerMessage struct {
Event *EventServerMessage `json:"event,omitempty"`
TransientData *TransientDataServerMessage `json:"transient,omitempty"`
Internal *InternalServerMessage `json:"internal,omitempty"`
Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"`
}
func (r *ServerMessage) CloseAfterSend(session Session) bool {
@ -444,12 +449,14 @@ const (
ServerFeatureWelcome = "welcome"
ServerFeatureHelloV2 = "hello-v2"
ServerFeatureSwitchTo = "switchto"
ServerFeatureDialout = "dialout"
// Features to send to internal clients only.
ServerFeatureInternalVirtualSessions = "virtual-sessions"
// Possible client features from the "hello" request.
ClientFeatureInternalInCall = "internal-incall"
ClientFeatureStartDialout = "start-dialout"
)
var (
@ -460,6 +467,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
DefaultFeaturesInternal = []string{
ServerFeatureInternalVirtualSessions,
@ -468,6 +476,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
DefaultWelcomeFeatures = []string{
ServerFeatureAudioVideoPermissions,
@ -477,6 +486,7 @@ var (
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
ServerFeatureDialout,
}
)
@ -684,6 +694,52 @@ func (m *InCallInternalClientMessage) CheckValid() error {
return nil
}
type DialoutStatus string
var (
DialoutStatusAccepted DialoutStatus = "accepted"
DialoutStatusRinging DialoutStatus = "ringing"
DialoutStatusConnected DialoutStatus = "connected"
DialoutStatusRejected DialoutStatus = "rejected"
DialoutStatusCleared DialoutStatus = "cleared"
)
type DialoutStatusInternalClientMessage struct {
CallId string `json:"callid"`
Status DialoutStatus `json:"status"`
// Cause is set if Status is "cleared" or "rejected".
Cause string `json:"cause,omitempty"`
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}
type DialoutInternalClientMessage struct {
Type string `json:"type"`
RoomId string `json:"roomid,omitempty"`
Error *Error `json:"error,omitempty"`
Status *DialoutStatusInternalClientMessage `json:"status,omitempty"`
}
func (m *DialoutInternalClientMessage) CheckValid() error {
switch m.Type {
case "":
return errors.New("type missing")
case "error":
if m.Error == nil {
return errors.New("error missing")
}
case "status":
if m.Status == nil {
return errors.New("status missing")
}
}
return nil
}
type InternalClientMessage struct {
Type string `json:"type"`
@ -694,10 +750,14 @@ type InternalClientMessage struct {
RemoveSession *RemoveSessionInternalClientMessage `json:"removesession,omitempty"`
InCall *InCallInternalClientMessage `json:"incall,omitempty"`
Dialout *DialoutInternalClientMessage `json:"dialout,omitempty"`
}
func (m *InternalClientMessage) CheckValid() error {
switch m.Type {
case "":
return errors.New("type missing")
case "addsession":
if m.AddSession == nil {
return fmt.Errorf("addsession missing")
@ -722,10 +782,28 @@ func (m *InternalClientMessage) CheckValid() error {
} else if err := m.InCall.CheckValid(); err != nil {
return err
}
case "dialout":
if m.Dialout == nil {
return fmt.Errorf("dialout missing")
} else if err := m.Dialout.CheckValid(); err != nil {
return err
}
}
return nil
}
type InternalServerDialoutRequest struct {
RoomId string `json:"roomid"`
Request *BackendRoomDialoutRequest `json:"request"`
}
type InternalServerMessage struct {
Type string `json:"type"`
Dialout *InternalServerDialoutRequest `json:"dialout,omitempty"`
}
// Type "event"
type RoomEventServerMessage struct {

View file

@ -28,6 +28,7 @@ import (
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@ -35,8 +36,10 @@ import (
"net/http"
"net/url"
"reflect"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
@ -639,6 +642,121 @@ func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, reques
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
type BackendResponseWithStatus interface {
Status() int
}
type DialoutErrorResponse struct {
BackendServerRoomResponse
status int
}
func (r *DialoutErrorResponse) Status() int {
return r.status
}
func returnDialoutError(status int, err *Error) (any, error) {
response := &DialoutErrorResponse{
BackendServerRoomResponse: BackendServerRoomResponse{
Type: "dialout",
Dialout: &BackendRoomDialoutResponse{
Error: err,
},
},
status: status,
}
return response, nil
}
var checkNumeric = regexp.MustCompile(`^[0-9]+$`)
func isNumeric(s string) bool {
return checkNumeric.MatchString(s)
}
func (b *BackendServer) startDialout(roomid string, backend *Backend, request *BackendServerRoomRequest) (any, error) {
if err := request.Dialout.ValidateNumber(); err != nil {
return returnDialoutError(http.StatusBadRequest, err)
}
if !isNumeric(roomid) {
return returnDialoutError(http.StatusBadRequest, NewError("invalid_roomid", "The room id must be numeric."))
}
var session *ClientSession
for s := range b.hub.dialoutSessions {
if s.GetClient() != nil {
session = s
break
}
}
if session == nil {
return returnDialoutError(http.StatusNotFound, NewError("no_client_available", "No available client found to trigger dialout."))
}
id := newRandomString(32)
msg := &ServerMessage{
Id: id,
Type: "internal",
Internal: &InternalServerMessage{
Type: "dialout",
Dialout: &InternalServerDialoutRequest{
RoomId: roomid,
Request: request.Dialout,
},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var response atomic.Pointer[DialoutInternalClientMessage]
session.HandleResponse(id, func(message *ClientMessage) bool {
response.Store(message.Internal.Dialout)
cancel()
// Don't send error to other sessions in the room.
return message.Internal.Dialout.Error != nil
})
defer session.ClearResponseHandler(id)
if !session.SendMessage(msg) {
return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "Could not notify about new dialout."))
}
<-ctx.Done()
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
return returnDialoutError(http.StatusGatewayTimeout, NewError("timeout", "Timeout while waiting for dialout to start."))
}
dialout := response.Load()
if dialout == nil {
return returnDialoutError(http.StatusBadGateway, NewError("error_notify", "No dialout response received."))
}
switch dialout.Type {
case "error":
return returnDialoutError(http.StatusBadGateway, dialout.Error)
case "status":
if dialout.Status.Status != DialoutStatusAccepted {
log.Printf("Received unsupported dialout status when triggering dialout: %+v", dialout)
return returnDialoutError(http.StatusBadGateway, NewError("unsupported_status", "Unsupported dialout status received."))
}
return &BackendServerRoomResponse{
Type: "dialout",
Dialout: &BackendRoomDialoutResponse{
CallId: dialout.Status.CallId,
},
}, nil
}
log.Printf("Received unsupported dialout type when triggering dialout: %+v", dialout)
return returnDialoutError(http.StatusBadGateway, NewError("unsupported_type", "Unsupported dialout type received."))
}
func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) {
v := mux.Vars(r)
roomid := v["roomid"]
@ -692,6 +810,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
request.ReceivedTime = time.Now().UnixNano()
var response any
var err error
switch request.Type {
case "invite":
@ -722,6 +841,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
err = b.sendRoomMessage(roomid, backend, &request)
case "switchto":
err = b.sendRoomSwitchTo(roomid, backend, &request)
case "dialout":
response, err = b.startDialout(roomid, backend, &request)
default:
http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest)
return
@ -733,11 +854,27 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
return
}
var responseData []byte
responseStatus := http.StatusOK
if response == nil {
// TODO(jojo): Return better response struct.
responseData = []byte("{}")
} else {
if s, ok := response.(BackendResponseWithStatus); ok {
responseStatus = s.Status()
}
responseData, err = json.Marshal(response)
if err != nil {
log.Printf("Could not serialize backend response %+v: %s", response, err)
responseStatus = http.StatusInternalServerError
responseData = []byte("{\"error\":\"could_not_serialize\"}")
}
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
// TODO(jojo): Return better response struct.
w.Write([]byte("{}")) // nolint
w.WriteHeader(responseStatus)
w.Write(responseData) // nolint
}
func (b *BackendServer) allowStatsAccess(r *http.Request) bool {

View file

@ -1760,3 +1760,296 @@ func TestBackendServer_StatsAllowedIps(t *testing.T) {
})
}
}
func Test_IsNumeric(t *testing.T) {
numeric := []string{
"0",
"1",
"12345",
}
nonNumeric := []string{
"",
" ",
" 0",
"0 ",
" 0 ",
"-1",
"1.2",
"1a",
"a1",
}
for _, s := range numeric {
if !isNumeric(s) {
t.Errorf("%s should be numeric", s)
}
}
for _, s := range nonNumeric {
if isNumeric(s) {
t.Errorf("%s should not be numeric", s)
}
}
}
func TestBackendServer_DialoutNoSipBridge(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloInternal(); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
_, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "12345"
msg := &BackendServerRoomRequest{
Type: "dialout",
Dialout: &BackendRoomDialoutRequest{
Number: "+1234567890",
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != http.StatusNotFound {
t.Fatalf("Expected error %d, got %s: %s", http.StatusNotFound, res.Status, string(body))
}
var response BackendServerRoomResponse
if err := json.Unmarshal(body, &response); err != nil {
t.Fatal(err)
}
if response.Type != "dialout" || response.Dialout == nil {
t.Fatalf("expected type dialout, got %s", string(body))
}
if response.Dialout.Error == nil {
t.Fatalf("expected dialout error, got %s", string(body))
}
if expected := "no_client_available"; response.Dialout.Error.Code != expected {
t.Errorf("expected error code %s, got %s", expected, string(body))
}
}
func TestBackendServer_DialoutAccepted(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
_, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "12345"
callId := "call-123"
stopped := make(chan struct{})
go func() {
defer close(stopped)
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
return
}
if msg.Type != "internal" || msg.Internal.Type != "dialout" {
t.Errorf("expected internal dialout message, got %+v", msg)
return
}
if msg.Internal.Dialout.RoomId != roomId {
t.Errorf("expected room id %s, got %+v", roomId, msg)
}
response := &ClientMessage{
Id: msg.Id,
Type: "internal",
Internal: &InternalClientMessage{
Type: "dialout",
Dialout: &DialoutInternalClientMessage{
Type: "status",
RoomId: msg.Internal.Dialout.RoomId,
Status: &DialoutStatusInternalClientMessage{
Status: "accepted",
CallId: callId,
},
},
},
}
if err := client.WriteJSON(response); err != nil {
t.Error(err)
}
}()
defer func() {
<-stopped
}()
msg := &BackendServerRoomRequest{
Type: "dialout",
Dialout: &BackendRoomDialoutRequest{
Number: "+1234567890",
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != http.StatusOK {
t.Fatalf("Expected error %d, got %s: %s", http.StatusOK, res.Status, string(body))
}
var response BackendServerRoomResponse
if err := json.Unmarshal(body, &response); err != nil {
t.Fatal(err)
}
if response.Type != "dialout" || response.Dialout == nil {
t.Fatalf("expected type dialout, got %s", string(body))
}
if response.Dialout.Error != nil {
t.Fatalf("expected dialout success, got %s", string(body))
}
if response.Dialout.CallId != callId {
t.Errorf("expected call id %s, got %s", callId, string(body))
}
}
func TestBackendServer_DialoutRejected(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
_, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "12345"
errorCode := "error-code"
errorMessage := "rejected call"
stopped := make(chan struct{})
go func() {
defer close(stopped)
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
return
}
if msg.Type != "internal" || msg.Internal.Type != "dialout" {
t.Errorf("expected internal dialout message, got %+v", msg)
return
}
if msg.Internal.Dialout.RoomId != roomId {
t.Errorf("expected room id %s, got %+v", roomId, msg)
}
response := &ClientMessage{
Id: msg.Id,
Type: "internal",
Internal: &InternalClientMessage{
Type: "dialout",
Dialout: &DialoutInternalClientMessage{
Type: "error",
Error: NewError(errorCode, errorMessage),
},
},
}
if err := client.WriteJSON(response); err != nil {
t.Error(err)
}
}()
defer func() {
<-stopped
}()
msg := &BackendServerRoomRequest{
Type: "dialout",
Dialout: &BackendRoomDialoutRequest{
Number: "+1234567890",
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != http.StatusBadGateway {
t.Fatalf("Expected error %d, got %s: %s", http.StatusBadGateway, res.Status, string(body))
}
var response BackendServerRoomResponse
if err := json.Unmarshal(body, &response); err != nil {
t.Fatal(err)
}
if response.Type != "dialout" || response.Dialout == nil {
t.Fatalf("expected type dialout, got %s", string(body))
}
if response.Dialout.Error == nil {
t.Fatalf("expected dialout error, got %s", string(body))
}
if response.Dialout.Error.Code != errorCode {
t.Errorf("expected error code %s, got %s", errorCode, string(body))
}
if response.Dialout.Error.Message != errorMessage {
t.Errorf("expected error message %s, got %s", errorMessage, string(body))
}
}

View file

@ -46,6 +46,9 @@ var (
PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend"
)
// ResponseHandlerFunc will return "true" has been fully processed.
type ResponseHandlerFunc func(message *ClientMessage) bool
type ClientSession struct {
roomJoinTime int64
inCall uint32
@ -89,6 +92,9 @@ type ClientSession struct {
seenJoinedLock sync.Mutex
seenJoinedEvents map[string]bool
responseHandlersLock sync.Mutex
responseHandlers map[string]ResponseHandlerFunc
}
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
@ -1420,3 +1426,38 @@ func (s *ClientSession) GetVirtualSessions() []*VirtualSession {
}
return result
}
func (s *ClientSession) HandleResponse(id string, handler ResponseHandlerFunc) {
s.responseHandlersLock.Lock()
defer s.responseHandlersLock.Unlock()
if s.responseHandlers == nil {
s.responseHandlers = make(map[string]ResponseHandlerFunc)
}
s.responseHandlers[id] = handler
}
func (s *ClientSession) ClearResponseHandler(id string) {
s.responseHandlersLock.Lock()
defer s.responseHandlersLock.Unlock()
delete(s.responseHandlers, id)
}
func (s *ClientSession) ProcessResponse(message *ClientMessage) bool {
id := message.Id
if id == "" {
return false
}
s.responseHandlersLock.Lock()
cb, found := s.responseHandlers[id]
defer s.responseHandlersLock.Unlock()
if !found {
return false
}
return cb(message)
}

View file

@ -793,6 +793,74 @@ Message format (Server -> Client, receive message)
- The `userid` is omitted if a message was sent by an anonymous user.
## Control messages
Similar to regular messages between clients which can be sent by any session,
messages with type `control` can only be sent if the permission flag `control`
is available.
These messages can be used to perform actions on clients that should only be
possible by some users (e.g. moderators).
Message format (Client -> Server, mute phone):
{
"id": "unique-request-id",
"type": "control",
"control": {
"recipient": {
"type": "session",
"sessionid": "the-session-id-to-send-to"
},
"data": {
"type": "mute",
"audio": "audio-flags"
}
}
}
The bit-field `audio-flags` supports the following bits:
- `1`: mute speaking (i.e. phone can no longer talk)
- `2`: mute listening (i.e. phone is on hold and can no longer hear)
To unmute, a value of `0` must be sent.
Message format (Client -> Server, hangup phone):
{
"id": "unique-request-id",
"type": "control",
"control": {
"recipient": {
"type": "session",
"sessionid": "the-session-id-to-send-to"
},
"data": {
"type": "hangup"
}
}
}
Message format (Client -> Server, send DTMF):
{
"id": "unique-request-id",
"type": "control",
"control": {
"recipient": {
"type": "session",
"sessionid": "the-session-id-to-send-to"
},
"data": {
"type": "dtmf",
"digit": "the-digit"
}
}
}
Supported digits are `0`-`9`, `*` and `#`.
## Transient data
Transient data can be used to share data in a room that is valid while sessions
@ -936,6 +1004,13 @@ Message format (Client -> Server):
}
Phone sessions will have `type` set to `phone` in the additional user data
(which will be included in the `joined` [room event](#room-events)),
`callid` will be the id of the phone call and `number` the target of the call.
The call id will match the one returned for accepted outgoing calls and the
associated session id can be used to hangup a call or send DTMF tones to it.
### Update virtual session
Message format (Client -> Server):
@ -1207,3 +1282,49 @@ Message format (Server -> Client):
Clients are expected to follow the `switchto` message. If clients don't switch
to the target room after some time, they might get disconnected.
### Start dialout from a room
Use this to start a phone dialout to a new user in a given room.
Message format (Backend -> Server)
{
"type": "dialout"
"dialout" {
"number": "e164-target-number",
"options": {
...arbitrary options that will be sent back to validate...
}
}
}
Please note that this requires a connected internal client that supports
dialout (e.g. the SIP bridge).
Message format (Server -> Backend, request was accepted)
{
"type": "dialout"
"dialout" {
"callid": "the-unique-call-id"
}
}
Message format (Server -> Backend, request could not be processed)
{
"type": "dialout"
"dialout" {
"error": {
"code": "the-internal-message-id",
"message": "human-readable-error-message",
"details": {
...optional additional details...
}
}
}
}
A HTTP error status code will be set in this case.

53
hub.go
View file

@ -97,6 +97,9 @@ var (
// Delay after which a screen publisher should be cleaned up.
cleanupScreenPublisherDelay = time.Second
// Delay after which a "cleared" / "rejected" dialout status should be removed.
removeCallStatusTTL = 5 * time.Second
)
const (
@ -150,6 +153,7 @@ type Hub struct {
expiredSessions map[Session]bool
anonymousSessions map[*ClientSession]time.Time
expectHelloClients map[*Client]time.Time
dialoutSessions map[*ClientSession]bool
backendTimeout time.Duration
backend *BackendClient
@ -338,6 +342,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
expiredSessions: make(map[Session]bool),
anonymousSessions: make(map[*ClientSession]time.Time),
expectHelloClients: make(map[*Client]time.Time),
dialoutSessions: make(map[*ClientSession]bool),
backendTimeout: backendTimeout,
backend: backend,
@ -641,6 +646,7 @@ func (h *Hub) removeSession(session Session) (removed bool) {
delete(h.expiredSessions, session)
if session, ok := session.(*ClientSession); ok {
delete(h.anonymousSessions, session)
delete(h.dialoutSessions, session)
}
h.mu.Unlock()
return
@ -802,8 +808,12 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
h.sessions[sessionIdData.Sid] = session
h.clients[sessionIdData.Sid] = client
delete(h.expectHelloClients, client)
if userId == "" && auth.Type != HelloClientTypeInternal {
if userId == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousSessionRoomLocked(session)
} else if session.ClientType() == HelloClientTypeInternal && session.HasFeature(ClientFeatureStartDialout) {
// TODO: There is a small race condition for sessions that take some time
// between connecting and joining a room.
h.dialoutSessions[session] = true
}
h.mu.Unlock()
@ -1250,6 +1260,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
h.startWaitAnonymousSessionRoom(session)
}
}
return
}
@ -1389,6 +1400,10 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
h.mu.Lock()
// The session now joined a room, don't expire if it is anonymous.
delete(h.anonymousSessions, session)
if session.ClientType() == HelloClientTypeInternal && session.HasFeature(ClientFeatureStartDialout) {
// An internal session in a room can not be used for dialout.
delete(h.dialoutSessions, session)
}
h.mu.Unlock()
session.SetRoom(r)
if room.Room.Permissions != nil {
@ -1772,6 +1787,10 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
return
}
if session.ProcessResponse(message) {
return
}
switch msg.Type {
case "addsession":
msg := msg.AddSession
@ -1923,6 +1942,38 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
room.NotifySessionChanged(session, SessionChangeInCall)
}
}
case "dialout":
roomId := msg.Dialout.RoomId
msg.Dialout.RoomId = "" // Don't send room id to recipients.
if msg.Dialout.Type == "status" {
asyncMessage := &AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "transient",
Transient: &BackendRoomTransientRequest{
Action: TransientActionSet,
Key: "callstatus_" + msg.Dialout.Status.CallId,
Value: msg.Dialout.Status,
},
},
}
if msg.Dialout.Status.Status == DialoutStatusCleared || msg.Dialout.Status.Status == DialoutStatusRejected {
asyncMessage.Room.Transient.TTL = removeCallStatusTTL
}
if err := h.events.PublishBackendRoomMessage(roomId, session.Backend(), asyncMessage); err != nil {
log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId)
}
} else {
if err := h.events.PublishRoomMessage(roomId, session.Backend(), &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Type: "dialout",
Dialout: msg.Dialout,
},
}); err != nil {
log.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId)
}
}
default:
log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId())
return

View file

@ -5349,3 +5349,214 @@ func TestGeoipOverrides(t *testing.T) {
t.Errorf("expected country %s, got %s", strings.ToUpper(country3), country)
}
}
func TestDialoutStatus(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
internalClient := NewTestClient(t, server, hub)
defer internalClient.CloseWithBye()
if err := internalClient.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
_, err := internalClient.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "12345"
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if _, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
}
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
callId := "call-123"
stopped := make(chan struct{})
go func(client *TestClient) {
defer close(stopped)
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
return
}
if msg.Type != "internal" || msg.Internal.Type != "dialout" {
t.Errorf("expected internal dialout message, got %+v", msg)
return
}
if msg.Internal.Dialout.RoomId != roomId {
t.Errorf("expected room id %s, got %+v", roomId, msg)
}
response := &ClientMessage{
Id: msg.Id,
Type: "internal",
Internal: &InternalClientMessage{
Type: "dialout",
Dialout: &DialoutInternalClientMessage{
Type: "status",
RoomId: msg.Internal.Dialout.RoomId,
Status: &DialoutStatusInternalClientMessage{
Status: "accepted",
CallId: callId,
},
},
},
}
if err := client.WriteJSON(response); err != nil {
t.Error(err)
}
}(internalClient)
defer func() {
<-stopped
}()
msg := &BackendServerRoomRequest{
Type: "dialout",
Dialout: &BackendRoomDialoutRequest{
Number: "+1234567890",
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != http.StatusOK {
t.Fatalf("Expected error %d, got %s: %s", http.StatusOK, res.Status, string(body))
}
var response BackendServerRoomResponse
if err := json.Unmarshal(body, &response); err != nil {
t.Fatal(err)
}
if response.Type != "dialout" || response.Dialout == nil {
t.Fatalf("expected type dialout, got %s", string(body))
}
if response.Dialout.Error != nil {
t.Fatalf("expected dialout success, got %s", string(body))
}
if response.Dialout.CallId != callId {
t.Errorf("expected call id %s, got %s", callId, string(body))
}
key := "callstatus_" + callId
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "accepted",
}, nil); err != nil {
t.Error(err)
}
}
if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{
RoomId: roomId,
Type: "status",
Status: &DialoutStatusInternalClientMessage{
CallId: callId,
Status: "ringing",
},
}); err != nil {
t.Fatal(err)
}
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "ringing",
},
map[string]interface{}{
"callid": callId,
"status": "accepted",
}); err != nil {
t.Error(err)
}
}
old := removeCallStatusTTL
defer func() {
removeCallStatusTTL = old
}()
removeCallStatusTTL = 500 * time.Millisecond
clearedCause := "cleared-call"
if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{
RoomId: roomId,
Type: "status",
Status: &DialoutStatusInternalClientMessage{
CallId: callId,
Status: "cleared",
Cause: clearedCause,
},
}); err != nil {
t.Fatal(err)
}
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "cleared",
"cause": clearedCause,
},
map[string]interface{}{
"callid": callId,
"status": "ringing",
}); err != nil {
t.Error(err)
}
}
ctx2, cancel := context.WithTimeout(ctx, removeCallStatusTTL*2)
defer cancel()
if msg, err := client.RunUntilMessage(ctx2); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientRemove(msg, key, map[string]interface{}{
"callid": callId,
"status": "cleared",
"cause": clearedCause,
}); err != nil {
t.Error(err)
}
}
}

View file

@ -244,6 +244,15 @@ func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest)
r.publishRoomMessage(message.Message)
case "switchto":
r.publishSwitchTo(message.SwitchTo)
case "transient":
switch message.Transient.Action {
case TransientActionSet:
r.SetTransientDataTTL(message.Transient.Key, message.Transient.Value, message.Transient.TTL)
case TransientActionDelete:
r.RemoveTransientData(message.Transient.Key)
default:
log.Printf("Unsupported transient action in room %s: %+v", r.Id(), message.Transient)
}
default:
log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.Id(), message)
}

View file

@ -34,6 +34,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -211,6 +212,7 @@ type TestClient struct {
hub *Hub
server *httptest.Server
mu sync.Mutex
conn *websocket.Conn
localAddr net.Addr
@ -280,6 +282,8 @@ func (c *TestClient) CloseWithBye() {
}
func (c *TestClient) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err == websocket.ErrCloseSent {
// Already closed
return
@ -368,6 +372,8 @@ func (c *TestClient) WriteJSON(data interface{}) error {
}
}
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.WriteJSON(data)
}
@ -578,6 +584,18 @@ func (c *TestClient) SendInternalRemoveSession(msg *RemoveSessionInternalClientM
return c.WriteJSON(message)
}
func (c *TestClient) SendInternalDialout(msg *DialoutInternalClientMessage) error {
message := &ClientMessage{
Id: "abcd",
Type: "internal",
Internal: &InternalClientMessage{
Type: "dialout",
Dialout: msg,
},
}
return c.WriteJSON(message)
}
func (c *TestClient) SetTransientData(key string, value interface{}, ttl time.Duration) error {
payload, err := json.Marshal(value)
if err != nil {