Process NATS messages directly without passing to channel in client.

Already in per-client/-session goroutines, so won't block others.
This commit is contained in:
Joachim Bauch 2020-08-03 14:28:00 +02:00
parent 2278a5ffba
commit d23a392a1b
Failed to extract signature
2 changed files with 58 additions and 72 deletions

View File

@ -84,8 +84,7 @@ type Client struct {
mu sync.Mutex
natsReceiver chan *NatsMessage
closeChan chan bool
closeChan chan bool
}
func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent string) (*Client, error) {
@ -98,12 +97,11 @@ func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent strin
agent = "unknown user agent"
}
client := &Client{
hub: hub,
conn: conn,
addr: remoteAddress,
agent: agent,
natsReceiver: make(chan *NatsMessage, 64),
closeChan: make(chan bool, 1),
hub: hub,
conn: conn,
addr: remoteAddress,
agent: agent,
closeChan: make(chan bool, 1),
}
return client, nil
}
@ -438,58 +436,6 @@ func (c *Client) writeMessageLocked(message *ServerMessage) bool {
return true
}
func (c *Client) ProcessNatsMessage(msg *NatsMessage) bool {
switch msg.Type {
case "message":
if msg.Message == nil {
log.Printf("Received NATS message without payload: %+v\n", msg)
return true
}
switch msg.Message.Type {
case "message":
session := c.GetSession()
if session != nil && msg.Message.Message != nil &&
msg.Message.Message.Sender != nil &&
msg.Message.Message.Sender.SessionId == session.PublicId() {
// Don't send message back to sender (can happen if sent to user or room)
return true
}
case "control":
session := c.GetSession()
if session != nil && msg.Message.Control != nil &&
msg.Message.Control.Sender != nil &&
msg.Message.Control.Sender.SessionId == session.PublicId() {
// Don't send message back to sender (can happen if sent to user or room)
return true
}
case "event":
if msg.Message.Event.Target == "participants" &&
msg.Message.Event.Type == "update" {
m := msg.Message.Event.Update
users := make(map[string]bool)
for _, entry := range m.Users {
users[entry["sessionId"].(string)] = true
}
for _, entry := range m.Changed {
if users[entry["sessionId"].(string)] {
continue
}
m.Users = append(m.Users, entry)
}
// TODO(jojo): Only send all users if current session id has
// changed its "inCall" flag to true.
m.Changed = nil
}
}
return c.writeMessage(msg.Message)
default:
log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg)
return true
}
}
func (c *Client) sendPing() bool {
c.mu.Lock()
defer c.mu.Unlock()
@ -522,16 +468,6 @@ func (c *Client) writePump() {
c.sendPing()
for {
select {
case message := <-c.natsReceiver:
if !c.ProcessNatsMessage(message) {
return
}
n := len(c.natsReceiver)
for i := 0; i < n; i++ {
if !c.ProcessNatsMessage(<-c.natsReceiver) {
return
}
}
case <-ticker.C:
if !c.sendPing() {
return

View File

@ -661,7 +661,57 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
return
}
client.natsReceiver <- &message
s.processNatsMessage(client, &message)
}
func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool {
switch msg.Type {
case "message":
if msg.Message == nil {
log.Printf("Received NATS message without payload: %+v\n", msg)
return true
}
switch msg.Message.Type {
case "message":
if msg.Message.Message != nil &&
msg.Message.Message.Sender != nil &&
msg.Message.Message.Sender.SessionId == s.PublicId() {
// Don't send message back to sender (can happen if sent to user or room)
return true
}
case "control":
if msg.Message.Control != nil &&
msg.Message.Control.Sender != nil &&
msg.Message.Control.Sender.SessionId == s.PublicId() {
// Don't send message back to sender (can happen if sent to user or room)
return true
}
case "event":
if msg.Message.Event.Target == "participants" &&
msg.Message.Event.Type == "update" {
m := msg.Message.Event.Update
users := make(map[string]bool)
for _, entry := range m.Users {
users[entry["sessionId"].(string)] = true
}
for _, entry := range m.Changed {
if users[entry["sessionId"].(string)] {
continue
}
m.Users = append(m.Users, entry)
}
// TODO(jojo): Only send all users if current session id has
// changed its "inCall" flag to true.
m.Changed = nil
}
}
return client.writeMessage(msg.Message)
default:
log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg)
return true
}
}
func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) {
@ -703,7 +753,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId())
had_participants_update := false
for _, message := range messages {
if !client.ProcessNatsMessage(message) {
if !s.processNatsMessage(client, message) {
break
}