From 762d1512c447ce53868f07ab35d14ce71b6d3b9a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Fri, 4 Jun 2021 15:13:29 +0200 Subject: [PATCH] Remove unused "Request" method from NATS client. --- natsclient.go | 6 --- natsclient_loopback.go | 70 ------------------------ natsclient_loopback_test.go | 24 --------- natsclient_test.go | 103 ------------------------------------ 4 files changed, 203 deletions(-) diff --git a/natsclient.go b/natsclient.go index 162e2e0..a989915 100644 --- a/natsclient.go +++ b/natsclient.go @@ -56,8 +56,6 @@ type NatsClient interface { Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) - Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) - Publish(subject string, message interface{}) error PublishNats(subject string, message *NatsMessage) error PublishMessage(subject string, message *ServerMessage) error @@ -142,10 +140,6 @@ func (c *natsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscript return c.nc.ChanSubscribe(subject, ch) } -func (c *natsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) { - return c.nc.Request(subject, data, timeout) -} - func (c *natsClient) Publish(subject string, message interface{}) error { return c.conn.Publish(subject, message) } diff --git a/natsclient_loopback.go b/natsclient_loopback.go index 5e4e08b..12fb8db 100644 --- a/natsclient_loopback.go +++ b/natsclient_loopback.go @@ -22,13 +22,9 @@ package signaling import ( - "context" "encoding/json" - "log" - "strconv" "strings" "sync" - "time" "github.com/nats-io/nats.go" ) @@ -36,7 +32,6 @@ import ( type LoopbackNatsClient struct { mu sync.Mutex subscriptions map[string]map[*loopbackNatsSubscription]bool - replyId uint64 } func NewLoopbackNatsClient() (NatsClient, error) { @@ -151,71 +146,6 @@ func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) { } } -func (c *LoopbackNatsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) { - if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") { - return nil, nats.ErrBadSubject - } - - c.mu.Lock() - defer c.mu.Unlock() - if c.subscriptions == nil { - return nil, nats.ErrConnectionClosed - } - - var response *nats.Msg - var err error - subs, found := c.subscriptions[subject] - if !found { - return nil, nats.ErrNoResponders - } - - replyId := c.replyId - c.replyId += 1 - - reply := "_reply_" + strconv.FormatUint(replyId, 10) - responder := make(chan *nats.Msg) - var replySubscriber NatsSubscription - replySubscriber, err = c.subscribe(reply, responder) - if err != nil { - return nil, err - } - - defer func() { - go func() { - if err := replySubscriber.Unsubscribe(); err != nil { - log.Printf("Error closing reply subscriber %s: %s", reply, err) - } - }() - }() - msg := &nats.Msg{ - Subject: subject, - Data: data, - Reply: reply, - Sub: &nats.Subscription{ - Subject: subject, - }, - } - for s := range subs { - s.queue(msg) - } - c.mu.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - select { - case response = <-responder: - err = nil - case <-ctx.Done(): - if ctx.Err() == context.DeadlineExceeded { - err = nats.ErrTimeout - } else { - err = ctx.Err() - } - } - c.mu.Lock() - return response, err -} - func (c *LoopbackNatsClient) Publish(subject string, message interface{}) error { if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") { return nats.ErrBadSubject diff --git a/natsclient_loopback_test.go b/natsclient_loopback_test.go index 7fec2d1..5f5911f 100644 --- a/natsclient_loopback_test.go +++ b/natsclient_loopback_test.go @@ -63,27 +63,3 @@ func TestLoopbackNatsClient_Subscribe(t *testing.T) { testNatsClient_Subscribe(t, client) }) } - -func TestLoopbackNatsClient_Request(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client := CreateLoopbackNatsClientForTest(t) - - testNatsClient_Request(t, client) - }) -} - -func TestLoopbackNatsClient_RequestTimeout(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client := CreateLoopbackNatsClientForTest(t) - - testNatsClient_RequestTimeout(t, client) - }) -} - -func TestLoopbackNatsClient_RequestNoReply(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client := CreateLoopbackNatsClientForTest(t) - - testNatsClient_RequestNoReply(t, client) - }) -} diff --git a/natsclient_test.go b/natsclient_test.go index 79cfbd8..7879048 100644 --- a/natsclient_test.go +++ b/natsclient_test.go @@ -109,106 +109,3 @@ func TestNatsClient_Subscribe(t *testing.T) { testNatsClient_Subscribe(t, client) }) } - -func testNatsClient_Request(t *testing.T, client NatsClient) { - dest := make(chan *nats.Msg) - sub, err := client.Subscribe("foo", dest) - if err != nil { - t.Fatal(err) - } - - go func() { - msg := <-dest - if err := client.Publish(msg.Reply, "world"); err != nil { - t.Error(err) - return - } - if err := sub.Unsubscribe(); err != nil { - t.Error("Unsubscribe failed with err:", err) - return - } - }() - reply, err := client.Request("foo", []byte("hello"), 30*time.Second) - if err != nil { - t.Fatal(err) - } - - var response string - if err := client.Decode(reply, &response); err != nil { - t.Fatal(err) - } - if response != "world" { - t.Fatalf("expected 'world', got '%s'", string(reply.Data)) - } -} - -func TestNatsClient_Request(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client, shutdown := CreateLocalNatsClientForTest(t) - defer shutdown() - - testNatsClient_Request(t, client) - }) -} - -func testNatsClient_RequestTimeout(t *testing.T, client NatsClient) { - dest := make(chan *nats.Msg) - sub, err := client.Subscribe("foo", dest) - if err != nil { - t.Fatal(err) - } - - go func() { - msg := <-dest - time.Sleep(200 * time.Millisecond) - if err := client.Publish(msg.Reply, []byte("world")); err != nil { - if err != nats.ErrConnectionClosed { - t.Error(err) - } - return - } - if err := sub.Unsubscribe(); err != nil { - t.Error("Unsubscribe failed with err:", err) - return - } - }() - reply, err := client.Request("foo", []byte("hello"), 100*time.Millisecond) - if err == nil { - t.Fatalf("Request should have timed out, reeived %+v", reply) - } else if err != nats.ErrTimeout { - t.Fatalf("Request should have timed out, received error %s", err) - } -} - -func TestNatsClient_RequestTimeout(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client, shutdown := CreateLocalNatsClientForTest(t) - defer shutdown() - - testNatsClient_RequestTimeout(t, client) - }) -} - -func testNatsClient_RequestNoReply(t *testing.T, client NatsClient) { - timeout := 100 * time.Millisecond - start := time.Now() - reply, err := client.Request("foo", []byte("hello"), timeout) - end := time.Now() - if err == nil { - t.Fatalf("Request should have failed without responsers, reeived %+v", reply) - } else if err != nats.ErrNoResponders { - t.Fatalf("Request should have failed without responsers, received error %s", err) - } - if end.Sub(start) >= timeout { - t.Errorf("Should have failed immediately but took %s", end.Sub(start)) - } -} - -func TestNatsClient_RequestNoReply(t *testing.T) { - ensureNoGoroutinesLeak(t, func() { - client, shutdown := CreateLocalNatsClientForTest(t) - defer shutdown() - - testNatsClient_RequestNoReply(t, client) - }) -}