Move closer helper to internal package.

This commit is contained in:
Joachim Bauch 2025-12-10 15:21:15 +01:00
commit 22f45ac482
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
12 changed files with 33 additions and 27 deletions

View file

@ -37,6 +37,7 @@ import (
"github.com/gorilla/websocket"
"github.com/mailru/easyjson"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -141,7 +142,7 @@ type Client struct {
mu sync.Mutex
closer *Closer
closer *internal.Closer
closeOnce sync.Once
messagesDone chan struct{}
messageChan chan *bytes.Buffer
@ -171,7 +172,7 @@ func (c *Client) SetConn(ctx context.Context, conn *websocket.Conn, remoteAddres
c.conn = conn
c.addr = remoteAddress
c.SetHandler(handler)
c.closer = NewCloser()
c.closer = internal.NewCloser()
c.messageChan = make(chan *bytes.Buffer, 16)
c.messagesDone = make(chan struct{})
}

View file

@ -93,7 +93,7 @@ type FederationClient struct {
url string
// +checklocks:mu
conn *websocket.Conn
closer *Closer
closer *internal.Closer
// +checklocks:mu
reconnectDelay time.Duration
reconnecting atomic.Bool
@ -153,7 +153,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
dialer: dialer,
url: url,
closer: NewCloser(),
closer: internal.NewCloser(),
}
result.roomId.Store(room.RoomId)
result.remoteRoomId.Store(remoteRoomId)

9
hub.go
View file

@ -53,6 +53,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/async"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -149,11 +150,11 @@ type Hub struct {
infoInternal *WelcomeServerMessage
welcome atomic.Value // *ServerMessage
closer *Closer
closer *internal.Closer
readPumpActive atomic.Int32
writePumpActive atomic.Int32
shutdown *Closer
shutdown *internal.Closer
shutdownScheduled atomic.Bool
roomUpdated chan *BackendServerRoomRequest
@ -372,8 +373,8 @@ func NewHub(ctx context.Context, config *goconf.ConfigFile, events AsyncEvents,
info: NewWelcomeServerMessage(version, DefaultFeatures...),
infoInternal: NewWelcomeServerMessage(version, DefaultFeaturesInternal...),
closer: NewCloser(),
shutdown: NewCloser(),
closer: internal.NewCloser(),
shutdown: internal.NewCloser(),
roomUpdated: make(chan *BackendServerRoomRequest),
roomDeleted: make(chan *BackendServerRoomRequest),

View file

@ -19,7 +19,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package internal
import (
"sync/atomic"

View file

@ -19,7 +19,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package internal
import (
"sync"

View file

@ -45,6 +45,7 @@ import (
"github.com/notedit/janus-go"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
)
const (
@ -188,7 +189,7 @@ func unexpected(request string) error {
type transaction struct {
ch chan any
incoming chan any
closer *Closer
closer *internal.Closer
}
func (t *transaction) run() {
@ -214,7 +215,7 @@ func newTransaction() *transaction {
t := &transaction{
ch: make(chan any, 1),
incoming: make(chan any, 8),
closer: NewCloser(),
closer: internal.NewCloser(),
}
return t
}
@ -264,7 +265,7 @@ type JanusGateway struct {
// +checklocks:Mutex
transactions map[uint64]*transaction
closer *Closer
closer *internal.Closer
writeMu sync.Mutex
}
@ -302,7 +303,7 @@ func NewJanusGateway(ctx context.Context, wsURL string, listener GatewayListener
listener: listener,
transactions: make(map[uint64]*transaction),
Sessions: make(map[uint64]*JanusSession),
closer: NewCloser(),
closer: internal.NewCloser(),
}
go gateway.ping()

View file

@ -816,7 +816,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id Pu
closeChan: make(chan struct{}, 1),
deferred: make(chan func(), 64),
},
sdpReady: NewCloser(),
sdpReady: internal.NewCloser(),
id: id,
settings: settings,
}
@ -1000,7 +1000,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
deferred: make(chan func(), 64),
},
sdpReady: NewCloser(),
sdpReady: internal.NewCloser(),
id: controller.PublisherId(),
settings: settings,
},

View file

@ -33,6 +33,7 @@ import (
"github.com/pion/sdp/v3"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
)
const (
@ -52,7 +53,7 @@ type mcuJanusPublisher struct {
settings NewPublisherSettings
stats publisherStatsCounter
sdpFlags Flags
sdpReady *Closer
sdpReady *internal.Closer
offerSdp atomic.Pointer[sdp.SessionDescription]
answerSdp atomic.Pointer[sdp.SessionDescription]
}

View file

@ -356,8 +356,8 @@ type mcuProxyConnection struct {
load atomic.Uint64
bandwidth atomic.Pointer[EventProxyServerBandwidth]
mu sync.Mutex
closer *Closer
closedDone *Closer
closer *internal.Closer
closedDone *internal.Closer
closed atomic.Bool
// +checklocks:mu
conn *websocket.Conn
@ -409,8 +409,8 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP, token str
url: parsed,
ip: ip,
connectToken: token,
closer: NewCloser(),
closedDone: NewCloser(),
closer: internal.NewCloser(),
closedDone: internal.NewCloser(),
callbacks: make(map[string]mcuProxyCallback),
publishers: make(map[string]*mcuProxyPublisher),
publisherIds: make(map[StreamId]PublicSessionId),

View file

@ -75,7 +75,7 @@ type Room struct {
// +checklocks:mu
properties json.RawMessage
closer *Closer
closer *internal.Closer
mu *sync.RWMutex
asyncCh AsyncChannel
// +checklocks:mu
@ -119,7 +119,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv
properties: properties,
closer: NewCloser(),
closer: internal.NewCloser(),
mu: &sync.RWMutex{},
asyncCh: make(AsyncChannel, DefaultAsyncChannelSize),
sessions: make(map[PublicSessionId]Session),

View file

@ -28,6 +28,7 @@ import (
"sync"
"time"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -66,7 +67,7 @@ func (e *pingEntries) RemoveRoom(roomId string) {
// and sent out batched every "updateActiveSessionsInterval" seconds.
type RoomPing struct {
mu sync.Mutex
closer *Closer
closer *internal.Closer
backend *BackendClient
capabilities *Capabilities
@ -77,7 +78,7 @@ type RoomPing struct {
func NewRoomPing(backend *BackendClient, capabilities *Capabilities) (*RoomPing, error) {
result := &RoomPing{
closer: NewCloser(),
closer: internal.NewCloser(),
backend: backend,
capabilities: capabilities,
}

View file

@ -29,6 +29,7 @@ import (
"sync"
"time"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -99,7 +100,7 @@ type memoryThrottler struct {
// +checklocks:mu
clients map[string]map[string][]throttleEntry
closer *Closer
closer *internal.Closer
}
func NewMemoryThrottler() (Throttler, error) {
@ -108,7 +109,7 @@ func NewMemoryThrottler() (Throttler, error) {
clients: make(map[string]map[string][]throttleEntry),
closer: NewCloser(),
closer: internal.NewCloser(),
}
result.doDelay = result.delay
go result.housekeeping()