mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-05-19 14:06:32 +02:00
Remove unused "Request" method from NATS client.
This commit is contained in:
parent
0d1c546355
commit
762d1512c4
|
@ -56,8 +56,6 @@ type NatsClient interface {
|
|||
|
||||
Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error)
|
||||
|
||||
Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error)
|
||||
|
||||
Publish(subject string, message interface{}) error
|
||||
PublishNats(subject string, message *NatsMessage) error
|
||||
PublishMessage(subject string, message *ServerMessage) error
|
||||
|
@ -142,10 +140,6 @@ func (c *natsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscript
|
|||
return c.nc.ChanSubscribe(subject, ch)
|
||||
}
|
||||
|
||||
func (c *natsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
|
||||
return c.nc.Request(subject, data, timeout)
|
||||
}
|
||||
|
||||
func (c *natsClient) Publish(subject string, message interface{}) error {
|
||||
return c.conn.Publish(subject, message)
|
||||
}
|
||||
|
|
|
@ -22,13 +22,9 @@
|
|||
package signaling
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
@ -36,7 +32,6 @@ import (
|
|||
type LoopbackNatsClient struct {
|
||||
mu sync.Mutex
|
||||
subscriptions map[string]map[*loopbackNatsSubscription]bool
|
||||
replyId uint64
|
||||
}
|
||||
|
||||
func NewLoopbackNatsClient() (NatsClient, error) {
|
||||
|
@ -151,71 +146,6 @@ func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *LoopbackNatsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
|
||||
if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") {
|
||||
return nil, nats.ErrBadSubject
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.subscriptions == nil {
|
||||
return nil, nats.ErrConnectionClosed
|
||||
}
|
||||
|
||||
var response *nats.Msg
|
||||
var err error
|
||||
subs, found := c.subscriptions[subject]
|
||||
if !found {
|
||||
return nil, nats.ErrNoResponders
|
||||
}
|
||||
|
||||
replyId := c.replyId
|
||||
c.replyId += 1
|
||||
|
||||
reply := "_reply_" + strconv.FormatUint(replyId, 10)
|
||||
responder := make(chan *nats.Msg)
|
||||
var replySubscriber NatsSubscription
|
||||
replySubscriber, err = c.subscribe(reply, responder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
go func() {
|
||||
if err := replySubscriber.Unsubscribe(); err != nil {
|
||||
log.Printf("Error closing reply subscriber %s: %s", reply, err)
|
||||
}
|
||||
}()
|
||||
}()
|
||||
msg := &nats.Msg{
|
||||
Subject: subject,
|
||||
Data: data,
|
||||
Reply: reply,
|
||||
Sub: &nats.Subscription{
|
||||
Subject: subject,
|
||||
},
|
||||
}
|
||||
for s := range subs {
|
||||
s.queue(msg)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
select {
|
||||
case response = <-responder:
|
||||
err = nil
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
err = nats.ErrTimeout
|
||||
} else {
|
||||
err = ctx.Err()
|
||||
}
|
||||
}
|
||||
c.mu.Lock()
|
||||
return response, err
|
||||
}
|
||||
|
||||
func (c *LoopbackNatsClient) Publish(subject string, message interface{}) error {
|
||||
if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") {
|
||||
return nats.ErrBadSubject
|
||||
|
|
|
@ -63,27 +63,3 @@ func TestLoopbackNatsClient_Subscribe(t *testing.T) {
|
|||
testNatsClient_Subscribe(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoopbackNatsClient_Request(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client := CreateLoopbackNatsClientForTest(t)
|
||||
|
||||
testNatsClient_Request(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoopbackNatsClient_RequestTimeout(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client := CreateLoopbackNatsClientForTest(t)
|
||||
|
||||
testNatsClient_RequestTimeout(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoopbackNatsClient_RequestNoReply(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client := CreateLoopbackNatsClientForTest(t)
|
||||
|
||||
testNatsClient_RequestNoReply(t, client)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -109,106 +109,3 @@ func TestNatsClient_Subscribe(t *testing.T) {
|
|||
testNatsClient_Subscribe(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func testNatsClient_Request(t *testing.T, client NatsClient) {
|
||||
dest := make(chan *nats.Msg)
|
||||
sub, err := client.Subscribe("foo", dest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
msg := <-dest
|
||||
if err := client.Publish(msg.Reply, "world"); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Error("Unsubscribe failed with err:", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
reply, err := client.Request("foo", []byte("hello"), 30*time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var response string
|
||||
if err := client.Decode(reply, &response); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if response != "world" {
|
||||
t.Fatalf("expected 'world', got '%s'", string(reply.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNatsClient_Request(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client, shutdown := CreateLocalNatsClientForTest(t)
|
||||
defer shutdown()
|
||||
|
||||
testNatsClient_Request(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func testNatsClient_RequestTimeout(t *testing.T, client NatsClient) {
|
||||
dest := make(chan *nats.Msg)
|
||||
sub, err := client.Subscribe("foo", dest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
msg := <-dest
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := client.Publish(msg.Reply, []byte("world")); err != nil {
|
||||
if err != nats.ErrConnectionClosed {
|
||||
t.Error(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Error("Unsubscribe failed with err:", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
reply, err := client.Request("foo", []byte("hello"), 100*time.Millisecond)
|
||||
if err == nil {
|
||||
t.Fatalf("Request should have timed out, reeived %+v", reply)
|
||||
} else if err != nats.ErrTimeout {
|
||||
t.Fatalf("Request should have timed out, received error %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNatsClient_RequestTimeout(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client, shutdown := CreateLocalNatsClientForTest(t)
|
||||
defer shutdown()
|
||||
|
||||
testNatsClient_RequestTimeout(t, client)
|
||||
})
|
||||
}
|
||||
|
||||
func testNatsClient_RequestNoReply(t *testing.T, client NatsClient) {
|
||||
timeout := 100 * time.Millisecond
|
||||
start := time.Now()
|
||||
reply, err := client.Request("foo", []byte("hello"), timeout)
|
||||
end := time.Now()
|
||||
if err == nil {
|
||||
t.Fatalf("Request should have failed without responsers, reeived %+v", reply)
|
||||
} else if err != nats.ErrNoResponders {
|
||||
t.Fatalf("Request should have failed without responsers, received error %s", err)
|
||||
}
|
||||
if end.Sub(start) >= timeout {
|
||||
t.Errorf("Should have failed immediately but took %s", end.Sub(start))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNatsClient_RequestNoReply(t *testing.T) {
|
||||
ensureNoGoroutinesLeak(t, func() {
|
||||
client, shutdown := CreateLocalNatsClientForTest(t)
|
||||
defer shutdown()
|
||||
|
||||
testNatsClient_RequestNoReply(t, client)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue