Browse Source

Simplify loopback NATS client.

Only use one goroutine per client instead of one per subscription.
This ensures that (like with the "real" client), all messages are
processed in order across different subscriptions.
pull/115/head
Joachim Bauch 2 weeks ago
parent
commit
c8886d03c9
No known key found for this signature in database GPG Key ID: 77C1D22D53E15F02
  1. 1
      backend_server_test.go
  2. 1
      hub_test.go
  3. 133
      natsclient_loopback.go
  4. 18
      natsclient_loopback_test.go
  5. 2
      natsclient_test.go

1
backend_server_test.go

@ -106,6 +106,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil
WaitForHub(ctx, t, hub)
(nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t)
nats.Close()
server.Close()
}

1
hub_test.go

@ -120,6 +120,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve
WaitForHub(ctx, t, h)
(nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t)
nats.Close()
server.Close()
}

133
natsclient_loopback.go

@ -22,10 +22,11 @@
package signaling
import (
"container/list"
"encoding/json"
"log"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
)
@ -33,90 +34,87 @@ import (
type LoopbackNatsClient struct {
mu sync.Mutex
subscriptions map[string]map[*loopbackNatsSubscription]bool
stopping bool
wakeup sync.Cond
incoming list.List
}
func NewLoopbackNatsClient() (NatsClient, error) {
return &LoopbackNatsClient{
client := &LoopbackNatsClient{
subscriptions: make(map[string]map[*loopbackNatsSubscription]bool),
}, nil
}
client.wakeup.L = &client.mu
go client.processMessages()
return client, nil
}
func (c *LoopbackNatsClient) Close() {
func (c *LoopbackNatsClient) processMessages() {
c.mu.Lock()
defer c.mu.Unlock()
for _, subs := range c.subscriptions {
for sub := range subs {
sub.Unsubscribe() // nolint
for {
for !c.stopping && c.incoming.Len() == 0 {
c.wakeup.Wait()
}
if c.stopping {
break
}
}
c.subscriptions = nil
}
type loopbackNatsSubscription struct {
subject string
client *LoopbackNatsClient
ch chan *nats.Msg
incoming []*nats.Msg
cond sync.Cond
quit bool
}
func (s *loopbackNatsSubscription) Unsubscribe() error {
s.cond.L.Lock()
if !s.quit {
s.quit = true
s.cond.Signal()
msg := c.incoming.Remove(c.incoming.Front()).(*nats.Msg)
c.processMessage(msg)
}
s.cond.L.Unlock()
s.client.unsubscribe(s)
return nil
}
func (s *loopbackNatsSubscription) queue(msg *nats.Msg) {
s.cond.L.Lock()
s.incoming = append(s.incoming, msg)
if len(s.incoming) == 1 {
s.cond.Signal()
func (c *LoopbackNatsClient) processMessage(msg *nats.Msg) {
subs, found := c.subscriptions[msg.Subject]
if !found {
return
}
s.cond.L.Unlock()
}
func (s *loopbackNatsSubscription) run() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for !s.quit {
for !s.quit && len(s.incoming) == 0 {
s.cond.Wait()
}
for !s.quit && len(s.incoming) > 0 {
msg := s.incoming[0]
s.incoming = s.incoming[1:]
s.cond.L.Unlock()
// A "real" NATS server would take some time to process the request,
// simulate this by sleeping a tiny bit.
time.Sleep(time.Millisecond)
s.ch <- msg
s.cond.L.Lock()
channels := make([]chan *nats.Msg, 0, len(subs))
for sub := range subs {
channels = append(channels, sub.ch)
}
c.mu.Unlock()
defer c.mu.Lock()
for _, ch := range channels {
select {
case ch <- msg:
default:
log.Printf("Slow consumer %s, dropping message", msg.Subject)
}
}
}
func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) {
func (c *LoopbackNatsClient) Close() {
c.mu.Lock()
defer c.mu.Unlock()
return c.subscribe(subject, ch)
c.subscriptions = nil
c.stopping = true
c.incoming.Init()
c.wakeup.Signal()
}
type loopbackNatsSubscription struct {
subject string
client *LoopbackNatsClient
ch chan *nats.Msg
}
func (s *loopbackNatsSubscription) Unsubscribe() error {
s.client.unsubscribe(s)
return nil
}
func (c *LoopbackNatsClient) subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) {
func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) {
if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") {
return nil, nats.ErrBadSubject
}
c.mu.Lock()
defer c.mu.Unlock()
if c.subscriptions == nil {
return nil, nats.ErrConnectionClosed
}
@ -126,7 +124,6 @@ func (c *LoopbackNatsClient) subscribe(subject string, ch chan *nats.Msg) (NatsS
client: c,
ch: ch,
}
s.cond.L = &sync.Mutex{}
subs, found := c.subscriptions[subject]
if !found {
subs = make(map[*loopbackNatsSubscription]bool)
@ -134,7 +131,6 @@ func (c *LoopbackNatsClient) subscribe(subject string, ch chan *nats.Msg) (NatsS
}
subs[s] = true
go s.run()
return s, nil
}
@ -161,18 +157,15 @@ func (c *LoopbackNatsClient) Publish(subject string, message interface{}) error
return nats.ErrConnectionClosed
}
if subs, found := c.subscriptions[subject]; found {
msg := &nats.Msg{
Subject: subject,
}
var err error
if msg.Data, err = json.Marshal(message); err != nil {
return err
}
for s := range subs {
s.queue(msg)
}
msg := &nats.Msg{
Subject: subject,
}
var err error
if msg.Data, err = json.Marshal(message); err != nil {
return err
}
c.incoming.PushBack(msg)
c.wakeup.Signal()
return nil
}

18
natsclient_loopback_test.go

@ -48,17 +48,20 @@ func (c *LoopbackNatsClient) waitForSubscriptionsEmpty(ctx context.Context, t *t
}
}
func CreateLoopbackNatsClientForTest(t *testing.T) NatsClient {
func CreateLoopbackNatsClientForTest(t *testing.T) (NatsClient, func()) {
result, err := NewLoopbackNatsClient()
if err != nil {
t.Fatal(err)
}
return result
return result, func() {
result.Close()
}
}
func TestLoopbackNatsClient_Subscribe(t *testing.T) {
ensureNoGoroutinesLeak(t, func() {
client := CreateLoopbackNatsClientForTest(t)
client, shutdown := CreateLoopbackNatsClientForTest(t)
defer shutdown()
testNatsClient_Subscribe(t, client)
})
@ -66,7 +69,8 @@ func TestLoopbackNatsClient_Subscribe(t *testing.T) {
func TestLoopbackClient_PublishAfterClose(t *testing.T) {
ensureNoGoroutinesLeak(t, func() {
client := CreateLoopbackNatsClientForTest(t)
client, shutdown := CreateLoopbackNatsClientForTest(t)
defer shutdown()
testNatsClient_PublishAfterClose(t, client)
})
@ -74,7 +78,8 @@ func TestLoopbackClient_PublishAfterClose(t *testing.T) {
func TestLoopbackClient_SubscribeAfterClose(t *testing.T) {
ensureNoGoroutinesLeak(t, func() {
client := CreateLoopbackNatsClientForTest(t)
client, shutdown := CreateLoopbackNatsClientForTest(t)
defer shutdown()
testNatsClient_SubscribeAfterClose(t, client)
})
@ -82,7 +87,8 @@ func TestLoopbackClient_SubscribeAfterClose(t *testing.T) {
func TestLoopbackClient_BadSubjects(t *testing.T) {
ensureNoGoroutinesLeak(t, func() {
client := CreateLoopbackNatsClientForTest(t)
client, shutdown := CreateLoopbackNatsClientForTest(t)
defer shutdown()
testNatsClient_BadSubjects(t, client)
})

2
natsclient_test.go

@ -90,7 +90,7 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
}
// Allow NATS goroutines to process messages.
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
<-ch

Loading…
Cancel
Save