diff --git a/src/signaling/client.go b/src/signaling/client.go index 97171fb..e1535c4 100644 --- a/src/signaling/client.go +++ b/src/signaling/client.go @@ -84,8 +84,7 @@ type Client struct { mu sync.Mutex - natsReceiver chan *NatsMessage - closeChan chan bool + closeChan chan bool } func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent string) (*Client, error) { @@ -98,12 +97,11 @@ func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent strin agent = "unknown user agent" } client := &Client{ - hub: hub, - conn: conn, - addr: remoteAddress, - agent: agent, - natsReceiver: make(chan *NatsMessage, 64), - closeChan: make(chan bool, 1), + hub: hub, + conn: conn, + addr: remoteAddress, + agent: agent, + closeChan: make(chan bool, 1), } return client, nil } @@ -438,58 +436,6 @@ func (c *Client) writeMessageLocked(message *ServerMessage) bool { return true } -func (c *Client) ProcessNatsMessage(msg *NatsMessage) bool { - switch msg.Type { - case "message": - if msg.Message == nil { - log.Printf("Received NATS message without payload: %+v\n", msg) - return true - } - - switch msg.Message.Type { - case "message": - session := c.GetSession() - if session != nil && msg.Message.Message != nil && - msg.Message.Message.Sender != nil && - msg.Message.Message.Sender.SessionId == session.PublicId() { - // Don't send message back to sender (can happen if sent to user or room) - return true - } - case "control": - session := c.GetSession() - if session != nil && msg.Message.Control != nil && - msg.Message.Control.Sender != nil && - msg.Message.Control.Sender.SessionId == session.PublicId() { - // Don't send message back to sender (can happen if sent to user or room) - return true - } - case "event": - if msg.Message.Event.Target == "participants" && - msg.Message.Event.Type == "update" { - m := msg.Message.Event.Update - users := make(map[string]bool) - for _, entry := range m.Users { - users[entry["sessionId"].(string)] = true - } - for _, entry := range m.Changed { - if users[entry["sessionId"].(string)] { - continue - } - m.Users = append(m.Users, entry) - } - // TODO(jojo): Only send all users if current session id has - // changed its "inCall" flag to true. - m.Changed = nil - } - } - - return c.writeMessage(msg.Message) - default: - log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) - return true - } -} - func (c *Client) sendPing() bool { c.mu.Lock() defer c.mu.Unlock() @@ -522,16 +468,6 @@ func (c *Client) writePump() { c.sendPing() for { select { - case message := <-c.natsReceiver: - if !c.ProcessNatsMessage(message) { - return - } - n := len(c.natsReceiver) - for i := 0; i < n; i++ { - if !c.ProcessNatsMessage(<-c.natsReceiver) { - return - } - } case <-ticker.C: if !c.sendPing() { return diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index b46d03e..7a9d2fc 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -661,7 +661,57 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { return } - client.natsReceiver <- &message + s.processNatsMessage(client, &message) +} + +func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool { + switch msg.Type { + case "message": + if msg.Message == nil { + log.Printf("Received NATS message without payload: %+v\n", msg) + return true + } + + switch msg.Message.Type { + case "message": + if msg.Message.Message != nil && + msg.Message.Message.Sender != nil && + msg.Message.Message.Sender.SessionId == s.PublicId() { + // Don't send message back to sender (can happen if sent to user or room) + return true + } + case "control": + if msg.Message.Control != nil && + msg.Message.Control.Sender != nil && + msg.Message.Control.Sender.SessionId == s.PublicId() { + // Don't send message back to sender (can happen if sent to user or room) + return true + } + case "event": + if msg.Message.Event.Target == "participants" && + msg.Message.Event.Type == "update" { + m := msg.Message.Event.Update + users := make(map[string]bool) + for _, entry := range m.Users { + users[entry["sessionId"].(string)] = true + } + for _, entry := range m.Changed { + if users[entry["sessionId"].(string)] { + continue + } + m.Users = append(m.Users, entry) + } + // TODO(jojo): Only send all users if current session id has + // changed its "inCall" flag to true. + m.Changed = nil + } + } + + return client.writeMessage(msg.Message) + default: + log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) + return true + } } func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) { @@ -703,7 +753,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) had_participants_update := false for _, message := range messages { - if !client.ProcessNatsMessage(message) { + if !s.processNatsMessage(client, message) { break }