mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-05-04 06:43:11 +02:00
Use structured log messages.
This commit is contained in:
parent
cb2a05c270
commit
a1746ff1ab
|
@ -197,24 +197,24 @@ func (b *BackendClient) getCapabilities(ctx context.Context, u *url.URL) (map[st
|
|||
capUrl.Path = capUrl.Path[:pos+11] + "/cloud/capabilities"
|
||||
}
|
||||
|
||||
b.logger.Infof("Capabilities expired for %s, updating", capUrl.String())
|
||||
b.logger.Infow("Capabilities expired", "url", capUrl.String())
|
||||
|
||||
pool, err := b.getPool(&capUrl)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not get client pool for host %s: %s", capUrl.Host, err)
|
||||
b.logger.Errorw("Could not get client pool", "host", capUrl.Host, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := pool.Get(ctx)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not get client for host %s: %s", capUrl.Host, err)
|
||||
b.logger.Errorw("Could not get client", "host", capUrl.Host, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
defer pool.Put(c)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", capUrl.String(), nil)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not create request to %s: %s", &capUrl, err)
|
||||
b.logger.Errorw("Could not create request", "url", capUrl.String(), "error", err)
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
@ -229,38 +229,38 @@ func (b *BackendClient) getCapabilities(ctx context.Context, u *url.URL) (map[st
|
|||
|
||||
ct := resp.Header.Get("Content-Type")
|
||||
if !strings.HasPrefix(ct, "application/json") {
|
||||
b.logger.Errorf("Received unsupported content-type from %s: %s (%s)", capUrl.String(), ct, resp.Status)
|
||||
b.logger.Errorw("Received unsupported content-type", "url", capUrl.String(), "contenttype", ct, "status", resp.Status)
|
||||
return nil, ErrUnsupportedContentType
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not read response body from %s: %s", capUrl.String(), err)
|
||||
b.logger.Errorw("Could not read response body", "url", capUrl.String(), "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ocs OcsResponse
|
||||
if err := json.Unmarshal(body, &ocs); err != nil {
|
||||
b.logger.Errorf("Could not decode OCS response %s from %s: %s", string(body), capUrl.String(), err)
|
||||
b.logger.Errorw("Could not decode OCS response", "response", string(body), "url", capUrl.String(), "error", err)
|
||||
return nil, err
|
||||
} else if ocs.Ocs == nil || ocs.Ocs.Data == nil {
|
||||
b.logger.Errorf("Incomplete OCS response %s from %s", string(body), u)
|
||||
b.logger.Errorw("Incomplete OCS response", "response", string(body), "url", capUrl.String())
|
||||
return nil, fmt.Errorf("incomplete OCS response")
|
||||
}
|
||||
|
||||
var response CapabilitiesResponse
|
||||
if err := json.Unmarshal(*ocs.Ocs.Data, &response); err != nil {
|
||||
b.logger.Errorf("Could not decode OCS response body %s from %s: %s", string(*ocs.Ocs.Data), capUrl.String(), err)
|
||||
b.logger.Errorw("Could not decode OCS response body", "response", string(*ocs.Ocs.Data), "url", capUrl.String(), "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
capa, found := response.Capabilities[AppNameSpreed]
|
||||
if !found {
|
||||
b.logger.Errorf("No capabilities received for app %s from %s: %+v", AppNameSpreed, capUrl.String(), response)
|
||||
b.logger.Errorw("No capabilities received for app", "app", AppNameSpreed, "url", capUrl.String(), "response", response)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
b.logger.Infof("Received capabilities %+v from %s", capa, capUrl.String())
|
||||
b.logger.Infow("Received capabilities", "capabilities", capa, "url", capUrl.String())
|
||||
b.capabilitiesLock.Lock()
|
||||
b.capabilities[key] = capa
|
||||
b.nextCapabilities[key] = now.Add(CapabilitiesCacheDuration)
|
||||
|
@ -271,7 +271,7 @@ func (b *BackendClient) getCapabilities(ctx context.Context, u *url.URL) (map[st
|
|||
func (b *BackendClient) HasCapabilityFeature(ctx context.Context, u *url.URL, feature string) bool {
|
||||
caps, err := b.getCapabilities(ctx, u)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not get capabilities for %s: %s", u, err)
|
||||
b.logger.Errorw("Could not get capabilities", "url", u.String(), "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -282,7 +282,7 @@ func (b *BackendClient) HasCapabilityFeature(ctx context.Context, u *url.URL, fe
|
|||
|
||||
features, ok := featuresInterface.([]interface{})
|
||||
if !ok {
|
||||
b.logger.Errorf("Invalid features list received for %s: %+v", u, featuresInterface)
|
||||
b.logger.Errorw(fmt.Sprintf("Invalid features list received: %+v", featuresInterface), "url", u.String())
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -318,26 +318,26 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ
|
|||
|
||||
pool, err := b.getPool(u)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not get client pool for host %s: %s", u.Host, err)
|
||||
b.logger.Errorw("Could not get client pool", "host", u.Host, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := pool.Get(ctx)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not get client for host %s: %s", u.Host, err)
|
||||
b.logger.Errorw("Could not get client", "host", u.Host, "error", err)
|
||||
return err
|
||||
}
|
||||
defer pool.Put(c)
|
||||
|
||||
data, err := json.Marshal(request)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not marshal request %+v: %s", request, err)
|
||||
b.logger.Errorw(fmt.Sprintf("Could not marshal request %+v", request), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", requestUrl.String(), bytes.NewReader(data))
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not create request to %s: %s", requestUrl, err)
|
||||
b.logger.Errorw("Could not create request", "url", requestUrl.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
@ -353,20 +353,20 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ
|
|||
|
||||
resp, err := c.Do(req)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not send request %s to %s: %s", string(data), req.URL, err)
|
||||
b.logger.Errorw("Could not send request", "request", string(data), "url", req.URL.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
ct := resp.Header.Get("Content-Type")
|
||||
if !strings.HasPrefix(ct, "application/json") {
|
||||
b.logger.Errorf("Received unsupported content-type from %s: %s (%s)", req.URL, ct, resp.Status)
|
||||
b.logger.Errorw("Received unsupported content-type", "url", req.URL.String(), "contenttype", ct, "status", resp.Status)
|
||||
return ErrUnsupportedContentType
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not read response body from %s: %s", req.URL, err)
|
||||
b.logger.Errorw("Could not read response body", "url", req.URL.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -381,17 +381,17 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ
|
|||
// }
|
||||
var ocs OcsResponse
|
||||
if err := json.Unmarshal(body, &ocs); err != nil {
|
||||
b.logger.Errorf("Could not decode OCS response %s from %s: %s", string(body), req.URL, err)
|
||||
b.logger.Errorw("Could not decode OCS response", "response", string(body), "url", req.URL.String(), "error", err)
|
||||
return err
|
||||
} else if ocs.Ocs == nil || ocs.Ocs.Data == nil {
|
||||
b.logger.Errorf("Incomplete OCS response %s from %s", string(body), req.URL)
|
||||
b.logger.Errorw("Incomplete OCS response", "response", string(body), "url", req.URL.String())
|
||||
return fmt.Errorf("incomplete OCS response")
|
||||
} else if err := json.Unmarshal(*ocs.Ocs.Data, response); err != nil {
|
||||
b.logger.Errorf("Could not decode OCS response body %s from %s: %s", string(*ocs.Ocs.Data), req.URL, err)
|
||||
b.logger.Errorw("Could not decode OCS response body", "response", string(*ocs.Ocs.Data), "url", req.URL.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
} else if err := json.Unmarshal(body, response); err != nil {
|
||||
b.logger.Errorf("Could not decode response body %s from %s: %s", string(body), req.URL, err)
|
||||
b.logger.Errorw("Could not decode response body", "response", string(body), "url", req.URL.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -137,14 +137,14 @@ func NewBackendConfiguration(logger Logger, config *goconf.ConfigFile) (*Backend
|
|||
sessionLimit: uint64(sessionLimit),
|
||||
}
|
||||
if sessionLimit > 0 {
|
||||
logger.Infof("Allow a maximum of %d sessions", sessionLimit)
|
||||
logger.Infow("Backend limits maximum number of sessions", "backend", compatBackend.id, "limit", sessionLimit)
|
||||
}
|
||||
numBackends++
|
||||
} else if backendIds, _ := config.GetString("backend", "backends"); backendIds != "" {
|
||||
for host, configuredBackends := range getConfiguredHosts(logger, backendIds, config) {
|
||||
backends[host] = append(backends[host], configuredBackends...)
|
||||
for _, be := range configuredBackends {
|
||||
logger.Infof("Backend %s added for %s", be.id, be.url)
|
||||
logger.Infow("Backend added", "backend", be.id, "url", be.url)
|
||||
}
|
||||
numBackends += len(configuredBackends)
|
||||
}
|
||||
|
@ -182,9 +182,9 @@ func NewBackendConfiguration(logger Logger, config *goconf.ConfigFile) (*Backend
|
|||
if len(hosts) > 1 {
|
||||
logger.Warn("Using deprecated backend configuration. Please migrate the \"allowed\" setting to the new \"backends\" configuration.")
|
||||
}
|
||||
logger.Infof("Allowed backend hostnames: %s", hosts)
|
||||
logger.Infow("Allowed backend hostnames", "hosts", hosts)
|
||||
if sessionLimit > 0 {
|
||||
logger.Infof("Allow a maximum of %d sessions", sessionLimit)
|
||||
logger.Infow("Backend limits maximum number of sessions", "backend", compatBackend.id, "limit", sessionLimit)
|
||||
}
|
||||
numBackends++
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func NewBackendConfiguration(logger Logger, config *goconf.ConfigFile) (*Backend
|
|||
func (b *BackendConfiguration) RemoveBackendsForHost(host string) {
|
||||
if oldBackends := b.backends[host]; len(oldBackends) > 0 {
|
||||
for _, backend := range oldBackends {
|
||||
b.logger.Infof("Backend %s removed for %s", backend.id, backend.url)
|
||||
b.logger.Infow("Backend removed", "backend", backend.id, "url", backend.url)
|
||||
}
|
||||
statsBackendsCurrent.Sub(float64(len(oldBackends)))
|
||||
}
|
||||
|
@ -226,14 +226,14 @@ func (b *BackendConfiguration) UpsertHost(host string, backends []*Backend) {
|
|||
found = true
|
||||
b.backends[host][existingIndex] = newBackend
|
||||
backends = append(backends[:index], backends[index+1:]...)
|
||||
b.logger.Infof("Backend %s updated for %s", newBackend.id, newBackend.url)
|
||||
b.logger.Infow("Backend updated", "backend", newBackend.id, "url", newBackend.url)
|
||||
break
|
||||
}
|
||||
index++
|
||||
}
|
||||
if !found {
|
||||
removed := b.backends[host][existingIndex]
|
||||
b.logger.Infof("Backend %s removed for %s", removed.id, removed.url)
|
||||
b.logger.Infow("Backend removed", "backend", removed.id, "url", removed.url)
|
||||
b.backends[host] = append(b.backends[host][:existingIndex], b.backends[host][existingIndex+1:]...)
|
||||
statsBackendsCurrent.Dec()
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func (b *BackendConfiguration) UpsertHost(host string, backends []*Backend) {
|
|||
|
||||
b.backends[host] = append(b.backends[host], backends...)
|
||||
for _, added := range backends {
|
||||
b.logger.Infof("Backend %s added for %s", added.id, added.url)
|
||||
b.logger.Infow("Backend added", "backend", added.id, "url", added.url)
|
||||
}
|
||||
statsBackendsCurrent.Add(float64(len(backends)))
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ func getConfiguredHosts(logger Logger, backendIds string, config *goconf.ConfigF
|
|||
for _, id := range getConfiguredBackendIDs(backendIds) {
|
||||
u, _ := config.GetString(id, "url")
|
||||
if u == "" {
|
||||
logger.Warnf("Backend %s is missing or incomplete, skipping", id)
|
||||
logger.Warnw("Backend is missing or incomplete, skipping", "backend", id)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -279,7 +279,7 @@ func getConfiguredHosts(logger Logger, backendIds string, config *goconf.ConfigF
|
|||
}
|
||||
parsed, err := url.Parse(u)
|
||||
if err != nil {
|
||||
logger.Warnf("Backend %s has an invalid url %s configured (%s), skipping", id, u, err)
|
||||
logger.Warnw("Backend has an invalid url configured, skipping", "backend", id, "url", u, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -290,7 +290,7 @@ func getConfiguredHosts(logger Logger, backendIds string, config *goconf.ConfigF
|
|||
|
||||
secret, _ := config.GetString(id, "secret")
|
||||
if u == "" || secret == "" {
|
||||
logger.Warnf("Backend %s is missing or incomplete, skipping", id)
|
||||
logger.Warnw("Backend is missing or incomplete, skipping", "backend", id)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -299,7 +299,7 @@ func getConfiguredHosts(logger Logger, backendIds string, config *goconf.ConfigF
|
|||
sessionLimit = 0
|
||||
}
|
||||
if sessionLimit > 0 {
|
||||
logger.Infof("Backend %s allows a maximum of %d sessions", id, sessionLimit)
|
||||
logger.Infow("Backend limits maximum number of sessions", "backend", id, "limit", sessionLimit)
|
||||
}
|
||||
|
||||
maxStreamBitrate, err := config.GetInt(id, "maxstreambitrate")
|
||||
|
|
|
@ -257,7 +257,7 @@ func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Reque
|
|||
}
|
||||
ct := r.Header.Get("Content-Type")
|
||||
if !strings.HasPrefix(ct, "application/json") {
|
||||
b.logger.Errorf("Received unsupported content-type: %s", ct)
|
||||
b.logger.Errorw("Received unsupported content-type", "contenttype", ct)
|
||||
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Reque
|
|||
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
b.logger.Errorf("Error reading body: %s", err)
|
||||
b.logger.Errorw("Error reading body", "error", err)
|
||||
http.Error(w, "Could not read body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids
|
|||
}
|
||||
for _, userid := range userids {
|
||||
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
|
||||
b.logger.Errorf("Could not publish room invite for user %s in backend %s: %s", userid, backend.Id(), err)
|
||||
b.logger.Errorw("Could not publish room invite", "userid", userid, "backend", backend.Id(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
|
|||
}
|
||||
for _, userid := range userids {
|
||||
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
|
||||
b.logger.Errorf("Could not publish room disinvite for user %s in backend %s: %s", userid, backend.Id(), err)
|
||||
b.logger.Errorw("Could not publish room disinvite", "userid", userid, "backend", backend.Id(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,10 +330,10 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
|
|||
go func(sessionid string) {
|
||||
defer wg.Done()
|
||||
if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil {
|
||||
b.logger.Errorf("Could not lookup by room session %s: %s", sessionid, err)
|
||||
b.logger.Errorw("Could not lookup by room session", "roomsessionid", sessionid, "error", err)
|
||||
} else if sid != "" {
|
||||
if err := b.nats.PublishMessage("session."+sid, msg); err != nil {
|
||||
b.logger.Errorf("Could not publish room disinvite for session %s: %s", sid, err)
|
||||
b.logger.Errorw("Could not publish room disinvite", "sessionid", sid, "error", err)
|
||||
}
|
||||
}
|
||||
}(sessionid)
|
||||
|
@ -364,14 +364,14 @@ func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified
|
|||
}
|
||||
|
||||
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
|
||||
b.logger.Errorf("Could not publish room update for user %s in backend %s: %s", userid, backend.Id(), err)
|
||||
b.logger.Errorw("Could not publish room update", "userid", userid, "backend", backend.Id(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *ConcurrentStringStringMap, timeout time.Duration) (string, error) {
|
||||
if roomSessionId == sessionIdNotInMeeting {
|
||||
b.logger.Infof("Trying to lookup empty room session id: %s", roomSessionId)
|
||||
b.logger.Infow("Trying to lookup empty room session id", "roomsessionid", roomSessionId)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
|
@ -408,13 +408,13 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user
|
|||
|
||||
roomSessionId, ok := roomSessionIdOb.(string)
|
||||
if !ok {
|
||||
b.logger.Infof("User %+v has invalid room session id, ignoring", user)
|
||||
b.logger.Infow("User has invalid room session id, ignoring", "user", user)
|
||||
delete(user, "sessionId")
|
||||
continue
|
||||
}
|
||||
|
||||
if roomSessionId == sessionIdNotInMeeting {
|
||||
b.logger.Infof("User %+v is not in the meeting, ignoring", user)
|
||||
b.logger.Infow("User is not in the meeting, ignoring", "user", user)
|
||||
delete(user, "sessionId")
|
||||
continue
|
||||
}
|
||||
|
@ -423,7 +423,7 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user
|
|||
go func(roomSessionId string, u map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
if sessionId, err := b.lookupByRoomSessionId(roomSessionId, cache, timeout); err != nil {
|
||||
b.logger.Errorf("Could not lookup by room session %s: %s", roomSessionId, err)
|
||||
b.logger.Errorw("Could not lookup by room session", "roomsessionid", roomSessionId, "error", err)
|
||||
delete(u, "sessionId")
|
||||
} else if sessionId != "" {
|
||||
u["sessionId"] = sessionId
|
||||
|
@ -485,14 +485,14 @@ loop:
|
|||
sessionId := user["sessionId"].(string)
|
||||
permissionsList, ok := permissionsInterface.([]interface{})
|
||||
if !ok {
|
||||
b.logger.Errorf("Received invalid permissions %+v (%s) for session %s", permissionsInterface, reflect.TypeOf(permissionsInterface), sessionId)
|
||||
b.logger.Errorw(fmt.Sprintf("Received invalid permissions %+v (%s)", permissionsInterface, reflect.TypeOf(permissionsInterface)), "sessionid", sessionId)
|
||||
continue
|
||||
}
|
||||
var permissions []Permission
|
||||
for idx, ob := range permissionsList {
|
||||
permission, ok := ob.(string)
|
||||
if !ok {
|
||||
b.logger.Errorf("Received invalid permission at position %d %+v (%s) for session %s", idx, ob, reflect.TypeOf(ob), sessionId)
|
||||
b.logger.Errorw(fmt.Sprintf("Received invalid permission at position %d %+v (%s)", idx, ob, reflect.TypeOf(ob)), "sessionid", sessionId)
|
||||
continue loop
|
||||
}
|
||||
permissions = append(permissions, Permission(permission))
|
||||
|
@ -506,7 +506,7 @@ loop:
|
|||
Permissions: permissions,
|
||||
}
|
||||
if err := b.nats.Publish("session."+sessionId, message); err != nil {
|
||||
b.logger.Errorf("Could not send permissions update (%+v) to session %s: %s", permissions, sessionId, err)
|
||||
b.logger.Errorw("Could not send permissions update", "message", permissions, "sessionid", sessionId, "error", err)
|
||||
}
|
||||
}(sessionId, permissions)
|
||||
}
|
||||
|
@ -565,7 +565,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
|
|||
|
||||
var request BackendServerRoomRequest
|
||||
if err := json.Unmarshal(body, &request); err != nil {
|
||||
b.logger.Errorf("Error decoding body %s: %s", string(body), err)
|
||||
b.logger.Errorw("Error decoding body", "request", string(body), "error", err)
|
||||
http.Error(w, "Could not read body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -598,7 +598,7 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
b.logger.Errorf("Error processing %s for room %s: %s", string(body), roomid, err)
|
||||
b.logger.Errorw("Error processing room request", "request", string(body), "roomid", roomid, "error", err)
|
||||
http.Error(w, "Error while processing", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
@ -631,7 +631,7 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
stats := b.hub.GetStats()
|
||||
statsData, err := json.MarshalIndent(stats, "", " ")
|
||||
if err != nil {
|
||||
b.logger.Errorf("Could not serialize stats %+v: %s", stats, err)
|
||||
b.logger.Errorw(fmt.Sprintf("Could not serialize stats %+v", stats), "error", err)
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
|
34
client.go
34
client.go
|
@ -262,7 +262,7 @@ func (c *Client) ReadPump() {
|
|||
conn := c.conn
|
||||
c.mu.Unlock()
|
||||
if conn == nil {
|
||||
c.logger.Warnf("Connection from %s closed while starting readPump", addr)
|
||||
c.logger.Warnw("Connection closed while starting readPump", "addr", addr)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -278,9 +278,9 @@ func (c *Client) ReadPump() {
|
|||
if c.logRTT {
|
||||
rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds()
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Infof("Client %s has RTT of %d ms (%s)", session.PublicId(), rtt_ms, rtt)
|
||||
c.logger.Infow("Client RTT", "sessionid", session.PublicId(), "rtt_ms", rtt_ms, "rtt", rtt)
|
||||
} else {
|
||||
c.logger.Infof("Client from %s has RTT of %d ms (%s)", addr, rtt_ms, rtt)
|
||||
c.logger.Infow("Client RTT", "addr", addr, "rtt_ms", rtt_ms, "rtt", rtt)
|
||||
}
|
||||
}
|
||||
c.OnRTTReceived(c, rtt)
|
||||
|
@ -299,9 +299,9 @@ func (c *Client) ReadPump() {
|
|||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived) {
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Error reading from client %s: %v", session.PublicId(), err)
|
||||
c.logger.Errorw("Error reading from client", "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Error reading from %s: %v", addr, err)
|
||||
c.logger.Errorw("Error reading from client", "addr", addr, "error", err)
|
||||
}
|
||||
}
|
||||
break
|
||||
|
@ -309,9 +309,9 @@ func (c *Client) ReadPump() {
|
|||
|
||||
if messageType != websocket.TextMessage {
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Unsupported message type %v from client %s", messageType, session.PublicId())
|
||||
c.logger.Errorw("Unsupported message type", "sessionid", session.PublicId(), "type", messageType)
|
||||
} else {
|
||||
c.logger.Errorf("Unsupported message type %v from %s", messageType, addr)
|
||||
c.logger.Errorw("Unsupported message type", "addr", addr, "type", messageType)
|
||||
}
|
||||
c.SendError(InvalidFormat)
|
||||
continue
|
||||
|
@ -322,9 +322,9 @@ func (c *Client) ReadPump() {
|
|||
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
|
||||
bufferPool.Put(decodeBuffer)
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Error reading message from client %s: %v", session.PublicId(), err)
|
||||
c.logger.Errorw("Error reading message", "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Error reading message from %s: %v", addr, err)
|
||||
c.logger.Errorw("Error reading message", "addr", addr, "error", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
@ -381,9 +381,9 @@ func (c *Client) writeInternal(message json.Marshaler) bool {
|
|||
}
|
||||
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Could not send message %+v to client %s: %v", message, session.PublicId(), err)
|
||||
c.logger.Errorw("Could not send message", "message", message, "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Could not send message %+v to %s: %v", message, c.RemoteAddr(), err)
|
||||
c.logger.Errorw("Could not send message", "message", message, "addr", c.RemoteAddr(), "error", err)
|
||||
}
|
||||
closeData = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "")
|
||||
goto close
|
||||
|
@ -394,9 +394,9 @@ close:
|
|||
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
|
||||
if err := c.conn.WriteMessage(websocket.CloseMessage, closeData); err != nil {
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Could not send close message to client %s: %v", session.PublicId(), err)
|
||||
c.logger.Errorw("Could not send close message", "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Could not send close message to %s: %v", c.RemoteAddr(), err)
|
||||
c.logger.Errorw("Could not send close message", "addr", c.RemoteAddr(), "error", err)
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
@ -421,9 +421,9 @@ func (c *Client) writeError(e error) bool { // nolint
|
|||
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
|
||||
if err := c.conn.WriteMessage(websocket.CloseMessage, closeData); err != nil {
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Could not send close message to client %s: %v", session.PublicId(), err)
|
||||
c.logger.Errorw("Could not send close message", "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Could not send close message to %s: %v", c.RemoteAddr(), err)
|
||||
c.logger.Errorw("Could not send close message", "addr", c.RemoteAddr(), "error", err)
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
@ -470,9 +470,9 @@ func (c *Client) sendPing() bool {
|
|||
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
|
||||
if session := c.GetSession(); session != nil {
|
||||
c.logger.Errorf("Could not send ping to client %s: %v", session.PublicId(), err)
|
||||
c.logger.Errorw("Could not send ping", "sessionid", session.PublicId(), "error", err)
|
||||
} else {
|
||||
c.logger.Errorf("Could not send ping to %s: %v", c.RemoteAddr(), err)
|
||||
c.logger.Errorw("Could not send ping", "addr", c.RemoteAddr(), "error", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ type ClientSession struct {
|
|||
|
||||
func NewClientSession(logger Logger, hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
|
||||
s := &ClientSession{
|
||||
logger: logger,
|
||||
logger: logger.With("sessionid", publicId),
|
||||
hub: hub,
|
||||
privateId: privateId,
|
||||
publicId: publicId,
|
||||
|
@ -261,7 +261,7 @@ func (s *ClientSession) SetPermissions(permissions []Permission) {
|
|||
|
||||
s.permissions = p
|
||||
s.supportsPermissions = true
|
||||
s.logger.Infof("Permissions of session %s changed: %s", s.PublicId(), permissions)
|
||||
s.logger.Infow("Permissions changed", "permissions", permissions)
|
||||
}
|
||||
|
||||
func (s *ClientSession) Backend() *Backend {
|
||||
|
@ -373,13 +373,13 @@ func (s *ClientSession) closeAndWait(wait bool) {
|
|||
defer s.mu.Unlock()
|
||||
if s.userSubscription != nil {
|
||||
if err := s.userSubscription.Unsubscribe(); err != nil {
|
||||
s.logger.Errorf("Error closing user subscription in session %s: %s", s.PublicId(), err)
|
||||
s.logger.Errorf("Error closing user subscription: %s", err)
|
||||
}
|
||||
s.userSubscription = nil
|
||||
}
|
||||
if s.sessionSubscription != nil {
|
||||
if err := s.sessionSubscription.Unsubscribe(); err != nil {
|
||||
s.logger.Errorf("Error closing session subscription in session %s: %s", s.PublicId(), err)
|
||||
s.logger.Errorf("Error closing session subscription: %s", err)
|
||||
}
|
||||
s.sessionSubscription = nil
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ func (s *ClientSession) SubscribeRoomNats(n NatsClient, roomid string, roomSessi
|
|||
return err
|
||||
}
|
||||
}
|
||||
s.logger.Infof("Session %s joined room %s with room session id %s", s.PublicId(), roomid, roomSessionId)
|
||||
s.logger.Infow("Joined room", "roomid", roomid, "roomsessionid", roomSessionId)
|
||||
s.roomSessionId = roomSessionId
|
||||
return nil
|
||||
}
|
||||
|
@ -459,7 +459,7 @@ func (s *ClientSession) LeaveCall() {
|
|||
return
|
||||
}
|
||||
|
||||
s.logger.Infof("Session %s left call %s", s.PublicId(), room.Id())
|
||||
s.logger.Infow("Left call", "roomid", room.Id())
|
||||
s.releaseMcuObjects()
|
||||
}
|
||||
|
||||
|
@ -489,7 +489,7 @@ func (s *ClientSession) UnsubscribeRoomNats() {
|
|||
func (s *ClientSession) doUnsubscribeRoomNats(notify bool) {
|
||||
if s.roomSubscription != nil {
|
||||
if err := s.roomSubscription.Unsubscribe(); err != nil {
|
||||
s.logger.Errorf("Error closing room subscription in session %s: %s", s.PublicId(), err)
|
||||
s.logger.Errorf("Error closing room subscription: %s", err)
|
||||
}
|
||||
s.roomSubscription = nil
|
||||
}
|
||||
|
@ -503,9 +503,9 @@ func (s *ClientSession) doUnsubscribeRoomNats(notify bool) {
|
|||
request.Room.Action = "leave"
|
||||
var response map[string]interface{}
|
||||
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
|
||||
s.logger.Errorf("Could not notify about room session %s left room %s: %s", sid, room.Id(), err)
|
||||
s.logger.Errorf(fmt.Sprintf("Could not notify about left room: %s", err), "roomid", room.Id(), "roomsessionid", sid)
|
||||
} else {
|
||||
s.logger.Infof("Removed room session %s: %+v", sid, response)
|
||||
s.logger.Infow(fmt.Sprintf("Removed room session: %+v", response), "roomid", room.Id(), "roomsessionid", sid)
|
||||
}
|
||||
}(s.roomSessionId)
|
||||
}
|
||||
|
@ -523,7 +523,7 @@ func (s *ClientSession) clearClientLocked(client *Client) {
|
|||
if s.client == nil {
|
||||
return
|
||||
} else if client != nil && s.client != client {
|
||||
s.logger.Warnf("Trying to clear other client in session %s", s.PublicId())
|
||||
s.logger.Warn("Trying to clear other client")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -688,7 +688,7 @@ func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{})
|
|||
}
|
||||
}
|
||||
|
||||
s.logger.Errorf("Session %s received candidate %+v for unknown client %s", s.PublicId(), candidate, client.Id())
|
||||
s.logger.Errorf("Received candidate %+v for unknown client %s", candidate, client.Id())
|
||||
}
|
||||
|
||||
func (s *ClientSession) OnIceCompleted(client McuClient) {
|
||||
|
@ -895,7 +895,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
|
|||
} else {
|
||||
s.publishers[streamType] = publisher
|
||||
}
|
||||
s.logger.Infof("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
|
||||
s.logger.Infof("Publishing %s as %s", streamType, publisher.Id())
|
||||
}
|
||||
|
||||
return publisher, nil
|
||||
|
@ -936,7 +936,7 @@ func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu Mcu, id s
|
|||
} else {
|
||||
s.subscribers[id+"|"+streamType] = subscriber
|
||||
}
|
||||
s.logger.Infof("Subscribing %s from %s as %s in session %s", streamType, id, subscriber.Id(), s.PublicId())
|
||||
s.logger.Infof("Subscribing %s from %s as %s", streamType, id, subscriber.Id())
|
||||
}
|
||||
|
||||
return subscriber, nil
|
||||
|
@ -952,7 +952,7 @@ func (s *ClientSession) GetSubscriber(id string, streamType string) McuSubscribe
|
|||
func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
||||
var message NatsMessage
|
||||
if err := s.hub.nats.Decode(msg, &message); err != nil {
|
||||
s.logger.Errorf("Could not decode NATS message %+v for session %s: %s", *msg, s.PublicId(), err)
|
||||
s.logger.Errorf("Could not decode NATS message %+v: %s", *msg, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -968,7 +968,7 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
|||
if (publisher.HasMedia(MediaTypeAudio) && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_AUDIO)) ||
|
||||
(publisher.HasMedia(MediaTypeVideo) && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_VIDEO)) {
|
||||
delete(s.publishers, streamTypeVideo)
|
||||
s.logger.Infof("Session %s is no longer allowed to publish media, closing publisher %s", s.PublicId(), publisher.Id())
|
||||
s.logger.Infof("No longer allowed to publish media, closing publisher %s", publisher.Id())
|
||||
go func() {
|
||||
publisher.Close(context.Background())
|
||||
}()
|
||||
|
@ -979,7 +979,7 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
|||
if !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_SCREEN) {
|
||||
if publisher, found := s.publishers[streamTypeScreen]; found {
|
||||
delete(s.publishers, streamTypeScreen)
|
||||
s.logger.Infof("Session %s is no longer allowed to publish screen, closing publisher %s", s.PublicId(), publisher.Id())
|
||||
s.logger.Infof("No longer allowed to publish screen, closing publisher %s", publisher.Id())
|
||||
go func() {
|
||||
publisher.Close(context.Background())
|
||||
}()
|
||||
|
@ -993,7 +993,7 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
|||
s.mu.Lock()
|
||||
roomSessionId := s.RoomSessionId()
|
||||
s.mu.Unlock()
|
||||
s.logger.Warnf("Closing session %s because same room session %s connected", s.PublicId(), roomSessionId)
|
||||
s.logger.Warnw("Closing because same room session connected", "roomsessionid", roomSessionId)
|
||||
s.LeaveRoom(false)
|
||||
defer s.closeAndWait(false)
|
||||
}
|
||||
|
@ -1021,7 +1021,7 @@ func (s *ClientSession) storePendingMessage(message *ServerMessage) {
|
|||
}
|
||||
s.pendingClientMessages = append(s.pendingClientMessages, message)
|
||||
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
|
||||
s.logger.Infof("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
|
||||
s.logger.Infof("%d pending messages", len(s.pendingClientMessages))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1099,7 +1099,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
|
|||
s.hasPendingParticipantsUpdate = false
|
||||
s.mu.Unlock()
|
||||
|
||||
s.logger.Infof("Send %d pending messages to session %s", len(messages), s.PublicId())
|
||||
s.logger.Infof("Send %d pending messages", len(messages))
|
||||
// Send through session to handle connection interruptions.
|
||||
s.SendMessages(messages)
|
||||
|
||||
|
|
90
hub.go
90
hub.go
|
@ -229,7 +229,7 @@ func NewHub(logger Logger, config *goconf.ConfigFile, nats NatsClient, r *mux.Ro
|
|||
if geoipUrl != "" {
|
||||
if strings.HasPrefix(geoipUrl, "file://") {
|
||||
geoipUrl = geoipUrl[7:]
|
||||
logger.Infof("Using GeoIP database from %s", geoipUrl)
|
||||
logger.Infow("Using GeoIP database from %s", geoipUrl)
|
||||
geoip, err = NewGeoLookupFromFile(logger, geoipUrl)
|
||||
} else {
|
||||
logger.Infof("Downloading GeoIP database from %s", geoipUrl)
|
||||
|
@ -574,7 +574,7 @@ func (h *Hub) checkExpiredSessions(now time.Time) {
|
|||
for s := range h.expiredSessions {
|
||||
if s.IsExpired(now) {
|
||||
h.mu.Unlock()
|
||||
h.logger.Infof("Closing expired session %s (private=%s)", s.PublicId(), s.PrivateId())
|
||||
h.logger.Infow("Closing expired session", "sessionid", s.PublicId())
|
||||
s.Close()
|
||||
h.mu.Lock()
|
||||
// Should already be deleted by the close code, but better be sure.
|
||||
|
@ -723,12 +723,13 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
|
|||
}
|
||||
|
||||
userId := auth.Auth.UserId
|
||||
logger := h.logger.With("backend", backend.Id(), "addr", client.RemoteAddr(), "country", client.Country(), "agent", client.UserAgent(), "sessionid", publicSessionId, "private", privateSessionId)
|
||||
if userId != "" {
|
||||
h.logger.Infof("Register user %s@%s from %s in %s (%s) %s (private=%s)", userId, backend.Id(), client.RemoteAddr(), client.Country(), client.UserAgent(), publicSessionId, privateSessionId)
|
||||
logger.Infow("Register client", "type", "user", "userid", userId)
|
||||
} else if message.Hello.Auth.Type != HelloClientTypeClient {
|
||||
h.logger.Infof("Register %s@%s from %s in %s (%s) %s (private=%s)", message.Hello.Auth.Type, backend.Id(), client.RemoteAddr(), client.Country(), client.UserAgent(), publicSessionId, privateSessionId)
|
||||
logger.Infow("Register client", "type", message.Hello.Auth.Type)
|
||||
} else {
|
||||
h.logger.Infof("Register anonymous@%s from %s in %s (%s) %s (private=%s)", backend.Id(), client.RemoteAddr(), client.Country(), client.UserAgent(), publicSessionId, privateSessionId)
|
||||
logger.Infow("Register client", "type", "anonymous")
|
||||
}
|
||||
|
||||
session, err := NewClientSession(h.logger, h, privateSessionId, publicSessionId, sessionIdData, backend, message.Hello, auth.Auth)
|
||||
|
@ -738,7 +739,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
|
|||
}
|
||||
|
||||
if err := backend.AddSession(session); err != nil {
|
||||
h.logger.Errorf("Error adding session %s to backend %s: %s", session.PublicId(), backend.Id(), err)
|
||||
h.logger.Errorw("Error adding session to backend", "session", session.PublicId(), "backend", backend.Id(), "error", err)
|
||||
session.Close()
|
||||
client.SendMessage(message.NewWrappedErrorServerMessage(err))
|
||||
return
|
||||
|
@ -785,7 +786,7 @@ func (h *Hub) processUnregister(client *Client) *ClientSession {
|
|||
}
|
||||
h.mu.Unlock()
|
||||
if session != nil {
|
||||
h.logger.Infof("Unregister %s (private=%s)", session.PublicId(), session.PrivateId())
|
||||
h.logger.Infow("Unregister client", "session", session.PublicId())
|
||||
session.ClearClient(client)
|
||||
}
|
||||
|
||||
|
@ -797,10 +798,10 @@ func (h *Hub) processMessage(client *Client, data []byte) {
|
|||
var message ClientMessage
|
||||
if err := message.UnmarshalJSON(data); err != nil {
|
||||
if session := client.GetSession(); session != nil {
|
||||
h.logger.Errorf("Error decoding message from client %s: %v", session.PublicId(), err)
|
||||
h.logger.Errorw("Error decoding message", "sessionid", session.PublicId(), "error", err)
|
||||
session.SendError(InvalidFormat)
|
||||
} else {
|
||||
h.logger.Errorf("Error decoding message from %s: %v", client.RemoteAddr(), err)
|
||||
h.logger.Errorw("Error decoding message", "addr", client.RemoteAddr(), "error", err)
|
||||
client.SendError(InvalidFormat)
|
||||
}
|
||||
return
|
||||
|
@ -808,10 +809,10 @@ func (h *Hub) processMessage(client *Client, data []byte) {
|
|||
|
||||
if err := message.CheckValid(); err != nil {
|
||||
if session := client.GetSession(); session != nil {
|
||||
h.logger.Errorf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
|
||||
h.logger.Errorw("Invalid message received", "message", message, "sessionid", session.PublicId(), "error", err)
|
||||
session.SendMessage(message.NewErrorServerMessage(InvalidFormat))
|
||||
} else {
|
||||
h.logger.Errorf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err)
|
||||
h.logger.Errorw("Invalid message received", "message", message, "addr", client.RemoteAddr(), "error", err)
|
||||
client.SendMessage(message.NewErrorServerMessage(InvalidFormat))
|
||||
}
|
||||
return
|
||||
|
@ -842,9 +843,9 @@ func (h *Hub) processMessage(client *Client, data []byte) {
|
|||
case "bye":
|
||||
h.processByeMsg(client, &message)
|
||||
case "hello":
|
||||
h.logger.Warnf("Ignore hello %+v for already authenticated connection %s", message.Hello, session.PublicId())
|
||||
h.logger.Warnw("Ignore hello for already authenticated connection", "message", message.Hello, "sessionid", session.PublicId())
|
||||
default:
|
||||
h.logger.Errorf("Ignore unknown message %+v from %s", message, session.PublicId())
|
||||
h.logger.Errorw("Ignore unknown message", "message", message, "sessionid", session.PublicId())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -886,7 +887,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
|||
if !ok {
|
||||
// Should never happen as clients only can resume their own sessions.
|
||||
h.mu.Unlock()
|
||||
h.logger.Infof("Client resumed non-client session %s (private=%s)", session.PublicId(), session.PrivateId())
|
||||
h.logger.Infow("Client tried to resume non-client session", "sessionid", session.PublicId(), "private", session.PrivateId())
|
||||
statsHubSessionResumeFailed.Inc()
|
||||
client.SendMessage(message.NewErrorServerMessage(NoSuchSession))
|
||||
return
|
||||
|
@ -899,7 +900,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
|||
}
|
||||
|
||||
if prev := clientSession.SetClient(client); prev != nil {
|
||||
h.logger.Infof("Closing previous client from %s for session %s", prev.RemoteAddr(), session.PublicId())
|
||||
h.logger.Infow("Closing previous client", "addr", prev.RemoteAddr(), "sessionid", session.PublicId())
|
||||
prev.SendByeResponseWithReason(nil, "session_resumed")
|
||||
}
|
||||
|
||||
|
@ -908,7 +909,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
|||
delete(h.expectHelloClients, client)
|
||||
h.mu.Unlock()
|
||||
|
||||
h.logger.Infof("Resume session from %s in %s (%s) %s (private=%s)", client.RemoteAddr(), client.Country(), client.UserAgent(), session.PublicId(), session.PrivateId())
|
||||
h.logger.Infow("Resume session", "addr", client.RemoteAddr(), "country", client.Country(), "agent", client.UserAgent(), "sessionid", session.PublicId(), "private", session.PrivateId())
|
||||
|
||||
statsHubSessionsResumedTotal.WithLabelValues(clientSession.Backend().Id(), clientSession.ClientType()).Inc()
|
||||
h.sendHelloResponse(clientSession, message)
|
||||
|
@ -994,7 +995,7 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) {
|
|||
if err == ErrNoSuchRoomSession {
|
||||
return
|
||||
} else if err != nil {
|
||||
h.logger.Errorf("Could not get session id for room session %s: %s", roomSessionId, err)
|
||||
h.logger.Errorw("Could not get session id for room session", "roomsessionid", roomSessionId, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1008,12 +1009,12 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) {
|
|||
},
|
||||
}
|
||||
if err := h.nats.PublishMessage("session."+sessionId, msg); err != nil {
|
||||
h.logger.Errorf("Could not send reconnect bye to session %s: %s", sessionId, err)
|
||||
h.logger.Errorw("Could not send reconnect bye", "sessionid", sessionId, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
h.logger.Infof("Closing session %s because same room session %s connected", session.PublicId(), roomSessionId)
|
||||
h.logger.Infow("Closing session because same room session connected", "sessionid", session.PublicId(), "roomsessionid", roomSessionId)
|
||||
session.LeaveRoom(false)
|
||||
switch sess := session.(type) {
|
||||
case *ClientSession:
|
||||
|
@ -1087,7 +1088,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
|||
sessionId := message.Room.SessionId
|
||||
if sessionId == "" {
|
||||
// TODO(jojo): Better make the session id required in the request.
|
||||
h.logger.Warnf("User did not send a room session id, assuming session %s", session.PublicId())
|
||||
h.logger.Warnw("User did not send a room session id, assuming session", "sessionid", session.PublicId())
|
||||
sessionId = session.PublicId()
|
||||
}
|
||||
request := NewBackendClientRoomRequest(roomId, session.UserId(), sessionId)
|
||||
|
@ -1344,7 +1345,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
|||
}
|
||||
}
|
||||
if subject == "" {
|
||||
h.logger.Errorf("Unknown recipient in message %+v from %s", msg, session.PublicId())
|
||||
h.logger.Errorw("Unknown recipient in message", "message", msg, "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1364,7 +1365,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
|||
return
|
||||
}
|
||||
|
||||
h.logger.Infof("Closing screen publisher for %s", session.PublicId())
|
||||
h.logger.Infow("Closing publisher", "sessionid", session.PublicId(), "type", streamTypeScreen)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
|
||||
defer cancel()
|
||||
publisher.Close(ctx)
|
||||
|
@ -1387,7 +1388,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
|||
// The recipient is connected to this instance, no need to go through NATS.
|
||||
if clientData != nil && clientData.Type == "sendoffer" {
|
||||
if err := session.IsAllowedToSend(clientData); err != nil {
|
||||
h.logger.Infof("Session %s is not allowed to send offer for %s, ignoring (%s)", session.PublicId(), clientData.RoomType, err)
|
||||
h.logger.Infow("Session is not allowed to send offer, ignoring", "sessionid", session.PublicId(), "type", clientData.RoomType, "error", err)
|
||||
sendNotAllowed(session, message, "Not allowed to send offer")
|
||||
return
|
||||
}
|
||||
|
@ -1407,11 +1408,11 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
|||
} else {
|
||||
if clientData != nil && clientData.Type == "sendoffer" {
|
||||
// TODO(jojo): Implement this.
|
||||
h.logger.Errorf("Sending offers to remote clients is not supported yet (client %s)", session.PublicId())
|
||||
h.logger.Errorw("Sending offers to remote clients is not supported yet", "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
if err := h.nats.PublishMessage(subject, response); err != nil {
|
||||
h.logger.Errorf("Error publishing message to remote session: %s", err)
|
||||
h.logger.Errorw("Error publishing message to remote session", "sessionid", session.PublicId(), "subject", subject, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1437,7 +1438,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) {
|
|||
// Client is not connected yet.
|
||||
return
|
||||
} else if !isAllowedToControl(session) {
|
||||
h.logger.Infof("Ignore control message %+v from %s", msg, session.PublicId())
|
||||
h.logger.Infow("Ignore control message", "message", msg, "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1492,7 +1493,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) {
|
|||
}
|
||||
}
|
||||
if subject == "" {
|
||||
h.logger.Errorf("Unknown recipient in message %+v from %s", msg, session.PublicId())
|
||||
h.logger.Errorw("Unknown recipient", "message", msg, "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1512,7 +1513,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) {
|
|||
recipient.SendMessage(response)
|
||||
} else {
|
||||
if err := h.nats.PublishMessage(subject, response); err != nil {
|
||||
h.logger.Errorf("Error publishing message to remote session: %s", err)
|
||||
h.logger.Errorw("Error publishing message to remote session", "sessionid", session.PublicId(), "subject", subject, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1524,7 +1525,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
|
|||
// Client is not connected yet.
|
||||
return
|
||||
} else if session.ClientType() != HelloClientTypeInternal {
|
||||
h.logger.Infof("Ignore internal message %+v from %s", msg, session.PublicId())
|
||||
h.logger.Infow("Ignore internal message", "message", msg, "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1766,17 +1767,18 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
var mc McuClient
|
||||
var err error
|
||||
var clientType string
|
||||
loggerWithSessionId := h.logger.With("sessionid", session.PublicId())
|
||||
switch data.Type {
|
||||
case "requestoffer":
|
||||
if session.PublicId() == message.Recipient.SessionId {
|
||||
h.logger.Infof("Not requesting offer from itself for session %s", session.PublicId())
|
||||
loggerWithSessionId.Info("Not requesting offer from itself")
|
||||
return
|
||||
}
|
||||
|
||||
// A user is only allowed to subscribe a stream if she is in the same room
|
||||
// as the other user and both have their "inCall" flag set.
|
||||
if !h.allowSubscribeAnyStream && !h.isInSameCall(senderSession, message.Recipient.SessionId) {
|
||||
h.logger.Infof("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId)
|
||||
loggerWithSessionId.Infow("Session is not in the same call as recipient, not requesting offer", "recipient", message.Recipient.SessionId)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to request offer.")
|
||||
return
|
||||
}
|
||||
|
@ -1791,18 +1793,18 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
clientType = "publisher"
|
||||
mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType, data)
|
||||
if err, ok := err.(*PermissionError); ok {
|
||||
h.logger.Infof("Session %s is not allowed to offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
|
||||
loggerWithSessionId.Infow("Session is not allowed to offer, ignoring", "type", data.RoomType, "error", err)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to publish.")
|
||||
return
|
||||
}
|
||||
if err, ok := err.(*SdpError); ok {
|
||||
h.logger.Infof("Session %s sent unsupported offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
|
||||
loggerWithSessionId.Infow("Session sent unsupported offer, ignoring", "type", data.RoomType, "error", err)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to publish.")
|
||||
return
|
||||
}
|
||||
case "selectStream":
|
||||
if session.PublicId() == message.Recipient.SessionId {
|
||||
h.logger.Infof("Not selecting substream for own %s stream in session %s", data.RoomType, session.PublicId())
|
||||
loggerWithSessionId.Infow("Not selecting substream for own stream", "type", data.RoomType)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1811,7 +1813,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
default:
|
||||
if session.PublicId() == message.Recipient.SessionId {
|
||||
if err := session.IsAllowedToSend(data); err != nil {
|
||||
h.logger.Infof("Session %s is not allowed to send candidate for %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
|
||||
loggerWithSessionId.Infow("Session is not allowed to send candidate, ignoring", "type", data.RoomType, "error", err)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.")
|
||||
return
|
||||
}
|
||||
|
@ -1824,18 +1826,18 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
}
|
||||
}
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err)
|
||||
loggerWithSessionId.Errorw(fmt.Sprintf("Could not create MCU %s", clientType), "data", data, "recipient", message.Recipient.SessionId, "error", err)
|
||||
sendMcuClientNotFound(senderSession, client_message)
|
||||
return
|
||||
} else if mc == nil {
|
||||
h.logger.Errorf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId)
|
||||
loggerWithSessionId.Errorw(fmt.Sprintf("No MCU %s found", clientType), "data", data, "recipient", message.Recipient.SessionId)
|
||||
sendMcuClientNotFound(senderSession, client_message)
|
||||
return
|
||||
}
|
||||
|
||||
mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) {
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err)
|
||||
loggerWithSessionId.Errorw("Could not send MCU message", "data", data, "recipient", message.Recipient.SessionId, "error", err)
|
||||
sendMcuProcessingFailed(senderSession, client_message)
|
||||
return
|
||||
} else if response == nil {
|
||||
|
@ -1860,7 +1862,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli
|
|||
}
|
||||
answer_data, err := json.Marshal(answer_message)
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not serialize answer %+v to %s: %s", answer_message, session.PublicId(), err)
|
||||
h.logger.Errorw(fmt.Sprintf("Could not serialize answer %+v", answer_message), "sessionid", session.PublicId(), "error", err)
|
||||
return
|
||||
}
|
||||
response_message = &ServerMessage{
|
||||
|
@ -1884,7 +1886,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli
|
|||
}
|
||||
offer_data, err := json.Marshal(offer_message)
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not serialize offer %+v to %s: %s", offer_message, session.PublicId(), err)
|
||||
h.logger.Errorw(fmt.Sprintf("Could not serialize offer %+v", offer_message), "sessionid", session.PublicId(), "error", err)
|
||||
return
|
||||
}
|
||||
response_message = &ServerMessage{
|
||||
|
@ -1899,7 +1901,7 @@ func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageCli
|
|||
},
|
||||
}
|
||||
default:
|
||||
h.logger.Errorf("Unsupported response %+v received to send to %s", response, session.PublicId())
|
||||
h.logger.Errorw(fmt.Sprintf("Unsupported response %+v received", response), "sessionid", session.PublicId())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -2014,7 +2016,7 @@ func (h *Hub) lookupClientCountry(client *Client) string {
|
|||
|
||||
country, err := h.geoip.LookupCountry(ip)
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not lookup country for %s: %s", ip, err)
|
||||
h.logger.Errorw("Could not lookup country", "ip", ip, "error", err)
|
||||
return unknownCountry
|
||||
}
|
||||
|
||||
|
@ -2030,13 +2032,13 @@ func (h *Hub) serveWs(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
conn, err := h.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not upgrade request from %s: %s", addr, err)
|
||||
h.logger.Errorw("Could not upgrade request", "addr", addr, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
client, err := NewClient(h.logger, conn, addr, agent)
|
||||
if err != nil {
|
||||
h.logger.Errorf("Could not create client for %s: %s", addr, err)
|
||||
h.logger.Errorw("Could not create client", "addr", addr, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -29,18 +29,23 @@ import (
|
|||
type Logger interface {
|
||||
Debug(args ...interface{})
|
||||
Debugf(template string, args ...interface{})
|
||||
Debugw(msg string, args ...interface{})
|
||||
|
||||
Error(args ...interface{})
|
||||
Errorf(template string, args ...interface{})
|
||||
Errorw(msg string, args ...interface{})
|
||||
|
||||
Fatal(args ...interface{})
|
||||
Fatalf(template string, args ...interface{})
|
||||
Fatalw(msg string, args ...interface{})
|
||||
|
||||
Info(args ...interface{})
|
||||
Infof(template string, args ...interface{})
|
||||
Infow(msg string, args ...interface{})
|
||||
|
||||
Warn(args ...interface{})
|
||||
Warnf(template string, args ...interface{})
|
||||
Warnw(msg string, args ...interface{})
|
||||
|
||||
With(args ...interface{}) Logger
|
||||
|
||||
|
|
Loading…
Reference in a new issue