You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

634 lines
19 KiB

  1. /**
  2. * Standalone signaling server for the Nextcloud Spreed app.
  3. * Copyright (C) 2017 struktur AG
  4. *
  5. * @author Joachim Bauch <bauch@struktur.de>
  6. *
  7. * @license GNU AGPL version 3 or any later version
  8. *
  9. * This program is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU Affero General Public License as published by
  11. * the Free Software Foundation, either version 3 of the License, or
  12. * (at your option) any later version.
  13. *
  14. * This program is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU Affero General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU Affero General Public License
  20. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. */
  22. package signaling
  23. import (
  24. "crypto/hmac"
  25. "crypto/rand"
  26. "crypto/sha1"
  27. "encoding/base64"
  28. "encoding/json"
  29. "fmt"
  30. "io"
  31. "io/ioutil"
  32. "log"
  33. "net"
  34. "net/http"
  35. "net/url"
  36. "reflect"
  37. "strings"
  38. "sync"
  39. "time"
  40. "github.com/dlintw/goconf"
  41. "github.com/gorilla/mux"
  42. )
  43. const (
  44. maxBodySize = 64 * 1024
  45. randomUsernameLength = 32
  46. sessionIdNotInMeeting = "0"
  47. )
  48. type BackendServer struct {
  49. hub *Hub
  50. nats NatsClient
  51. roomSessions RoomSessions
  52. version string
  53. welcomeMessage string
  54. turnapikey string
  55. turnsecret []byte
  56. turnvalid time.Duration
  57. turnservers []string
  58. statsAllowedIps map[string]bool
  59. invalidSecret []byte
  60. }
  61. func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) {
  62. turnapikey, _ := config.GetString("turn", "apikey")
  63. turnsecret, _ := config.GetString("turn", "secret")
  64. turnservers, _ := config.GetString("turn", "servers")
  65. // TODO(jojo): Make the validity for TURN credentials configurable.
  66. turnvalid := 24 * time.Hour
  67. var turnserverslist []string
  68. for _, s := range strings.Split(turnservers, ",") {
  69. s = strings.TrimSpace(s)
  70. if s != "" {
  71. turnserverslist = append(turnserverslist, s)
  72. }
  73. }
  74. if len(turnserverslist) != 0 {
  75. if turnapikey == "" {
  76. return nil, fmt.Errorf("need a TURN API key if TURN servers are configured")
  77. }
  78. if turnsecret == "" {
  79. return nil, fmt.Errorf("need a shared TURN secret if TURN servers are configured")
  80. }
  81. log.Printf("Using configured TURN API key")
  82. log.Printf("Using configured shared TURN secret")
  83. for _, s := range turnserverslist {
  84. log.Printf("Adding \"%s\" as TURN server", s)
  85. }
  86. }
  87. statsAllowed, _ := config.GetString("stats", "allowed_ips")
  88. var statsAllowedIps map[string]bool
  89. if statsAllowed == "" {
  90. log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
  91. statsAllowedIps = map[string]bool{
  92. "127.0.0.1": true,
  93. }
  94. } else {
  95. log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
  96. statsAllowedIps = make(map[string]bool)
  97. for _, ip := range strings.Split(statsAllowed, ",") {
  98. ip = strings.TrimSpace(ip)
  99. if ip != "" {
  100. statsAllowedIps[ip] = true
  101. }
  102. }
  103. }
  104. invalidSecret := make([]byte, 32)
  105. if _, err := rand.Read(invalidSecret); err != nil {
  106. return nil, err
  107. }
  108. return &BackendServer{
  109. hub: hub,
  110. nats: hub.nats,
  111. roomSessions: hub.roomSessions,
  112. version: version,
  113. turnapikey: turnapikey,
  114. turnsecret: []byte(turnsecret),
  115. turnvalid: turnvalid,
  116. turnservers: turnserverslist,
  117. statsAllowedIps: statsAllowedIps,
  118. invalidSecret: invalidSecret,
  119. }, nil
  120. }
  121. func (b *BackendServer) Start(r *mux.Router) error {
  122. welcome := map[string]string{
  123. "nextcloud-spreed-signaling": "Welcome",
  124. "version": b.version,
  125. }
  126. if welcomeMessage, err := json.Marshal(welcome); err != nil {
  127. // Should never happen.
  128. return err
  129. } else {
  130. b.welcomeMessage = string(welcomeMessage) + "\n"
  131. }
  132. s := r.PathPrefix("/api/v1").Subrouter()
  133. s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET")
  134. s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.parseRequestBody(b.roomHandler))).Methods("POST")
  135. s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")
  136. // Provide a REST service to get TURN credentials.
  137. // See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
  138. r.HandleFunc("/turn/credentials", b.setComonHeaders(b.getTurnCredentials)).Methods("GET")
  139. return nil
  140. }
  141. func (b *BackendServer) setComonHeaders(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
  142. return func(w http.ResponseWriter, r *http.Request) {
  143. w.Header().Set("Server", "nextcloud-spreed-signaling/"+b.version)
  144. f(w, r)
  145. }
  146. }
  147. func (b *BackendServer) welcomeFunc(w http.ResponseWriter, r *http.Request) {
  148. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  149. w.WriteHeader(http.StatusOK)
  150. io.WriteString(w, b.welcomeMessage) // nolint
  151. }
  152. func calculateTurnSecret(username string, secret []byte, valid time.Duration) (string, string) {
  153. expires := time.Now().Add(valid)
  154. username = fmt.Sprintf("%d:%s", expires.Unix(), username)
  155. m := hmac.New(sha1.New, secret)
  156. m.Write([]byte(username)) // nolint
  157. password := base64.StdEncoding.EncodeToString(m.Sum(nil))
  158. return username, password
  159. }
  160. func (b *BackendServer) getTurnCredentials(w http.ResponseWriter, r *http.Request) {
  161. q := r.URL.Query()
  162. service := q.Get("service")
  163. username := q.Get("username")
  164. key := q.Get("key")
  165. if key == "" {
  166. // The RFC actually defines "key" to be the parameter, but Janus sends it as "api".
  167. key = q.Get("api")
  168. }
  169. if service != "turn" || key == "" {
  170. w.WriteHeader(http.StatusBadRequest)
  171. io.WriteString(w, "Invalid service and/or key sent.\n") // nolint
  172. return
  173. }
  174. if key != b.turnapikey {
  175. w.WriteHeader(http.StatusForbidden)
  176. io.WriteString(w, "Not allowed to access this service.\n") // nolint
  177. return
  178. }
  179. if len(b.turnservers) == 0 {
  180. w.WriteHeader(http.StatusNotFound)
  181. io.WriteString(w, "No TURN servers available.\n") // nolint
  182. return
  183. }
  184. if username == "" {
  185. // Make sure to include an actual username in the credentials.
  186. username = newRandomString(randomUsernameLength)
  187. }
  188. username, password := calculateTurnSecret(username, b.turnsecret, b.turnvalid)
  189. result := TurnCredentials{
  190. Username: username,
  191. Password: password,
  192. TTL: int64(b.turnvalid.Seconds()),
  193. URIs: b.turnservers,
  194. }
  195. data, err := json.Marshal(result)
  196. if err != nil {
  197. log.Printf("Could not serialize TURN credentials: %s", err)
  198. w.WriteHeader(http.StatusInternalServerError)
  199. io.WriteString(w, "Could not serialize credentials.") // nolint
  200. return
  201. }
  202. if data[len(data)-1] != '\n' {
  203. data = append(data, '\n')
  204. }
  205. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  206. w.WriteHeader(http.StatusOK)
  207. w.Write(data) // nolint
  208. }
  209. func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Request, []byte)) func(http.ResponseWriter, *http.Request) {
  210. return func(w http.ResponseWriter, r *http.Request) {
  211. // Sanity checks
  212. if r.ContentLength == -1 {
  213. http.Error(w, "Length required", http.StatusLengthRequired)
  214. return
  215. } else if r.ContentLength > maxBodySize {
  216. http.Error(w, "Request entity too large", http.StatusRequestEntityTooLarge)
  217. return
  218. }
  219. ct := r.Header.Get("Content-Type")
  220. if !strings.HasPrefix(ct, "application/json") {
  221. log.Printf("Received unsupported content-type: %s", ct)
  222. http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
  223. return
  224. }
  225. if r.Header.Get(HeaderBackendSignalingRandom) == "" ||
  226. r.Header.Get(HeaderBackendSignalingChecksum) == "" {
  227. http.Error(w, "Authentication check failed", http.StatusForbidden)
  228. return
  229. }
  230. body, err := ioutil.ReadAll(r.Body)
  231. if err != nil {
  232. log.Println("Error reading body: ", err)
  233. http.Error(w, "Could not read body", http.StatusBadRequest)
  234. return
  235. }
  236. f(w, r, body)
  237. }
  238. }
  239. func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids []string, properties *json.RawMessage) {
  240. msg := &ServerMessage{
  241. Type: "event",
  242. Event: &EventServerMessage{
  243. Target: "roomlist",
  244. Type: "invite",
  245. Invite: &RoomEventServerMessage{
  246. RoomId: roomid,
  247. Properties: properties,
  248. },
  249. },
  250. }
  251. for _, userid := range userids {
  252. if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
  253. log.Printf("Could not publish room invite for user %s in backend %s: %s", userid, backend.Id(), err)
  254. }
  255. }
  256. }
  257. func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reason string, userids []string, sessionids []string) {
  258. msg := &ServerMessage{
  259. Type: "event",
  260. Event: &EventServerMessage{
  261. Target: "roomlist",
  262. Type: "disinvite",
  263. Disinvite: &RoomDisinviteEventServerMessage{
  264. RoomEventServerMessage: RoomEventServerMessage{
  265. RoomId: roomid,
  266. },
  267. Reason: reason,
  268. },
  269. },
  270. }
  271. for _, userid := range userids {
  272. if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
  273. log.Printf("Could not publish room disinvite for user %s in backend %s: %s", userid, backend.Id(), err)
  274. }
  275. }
  276. timeout := time.Second
  277. var wg sync.WaitGroup
  278. for _, sessionid := range sessionids {
  279. if sessionid == sessionIdNotInMeeting {
  280. // Ignore entries that are no longer in the meeting.
  281. continue
  282. }
  283. wg.Add(1)
  284. go func(sessionid string) {
  285. defer wg.Done()
  286. if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil {
  287. log.Printf("Could not lookup by room session %s: %s", sessionid, err)
  288. } else if sid != "" {
  289. if err := b.nats.PublishMessage("session."+sid, msg); err != nil {
  290. log.Printf("Could not publish room disinvite for session %s: %s", sid, err)
  291. }
  292. }
  293. }(sessionid)
  294. }
  295. wg.Wait()
  296. }
  297. func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified_userids []string, all_userids []string, properties *json.RawMessage) {
  298. msg := &ServerMessage{
  299. Type: "event",
  300. Event: &EventServerMessage{
  301. Target: "roomlist",
  302. Type: "update",
  303. Update: &RoomEventServerMessage{
  304. RoomId: roomid,
  305. Properties: properties,
  306. },
  307. },
  308. }
  309. notified := make(map[string]bool)
  310. for _, userid := range notified_userids {
  311. notified[userid] = true
  312. }
  313. // Only send to users not notified otherwise.
  314. for _, userid := range all_userids {
  315. if notified[userid] {
  316. continue
  317. }
  318. if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
  319. log.Printf("Could not publish room update for user %s in backend %s: %s", userid, backend.Id(), err)
  320. }
  321. }
  322. }
  323. func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *ConcurrentStringStringMap, timeout time.Duration) (string, error) {
  324. if roomSessionId == sessionIdNotInMeeting {
  325. log.Printf("Trying to lookup empty room session id: %s", roomSessionId)
  326. return "", nil
  327. }
  328. if cache != nil {
  329. if result, found := cache.Get(roomSessionId); found {
  330. return result, nil
  331. }
  332. }
  333. sid, err := b.roomSessions.GetSessionId(roomSessionId)
  334. if err == ErrNoSuchRoomSession {
  335. return "", nil
  336. } else if err != nil {
  337. return "", err
  338. }
  339. if cache != nil {
  340. cache.Set(roomSessionId, sid)
  341. }
  342. return sid, nil
  343. }
  344. func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, users []map[string]interface{}, timeout time.Duration) []map[string]interface{} {
  345. if len(users) == 0 {
  346. return users
  347. }
  348. var wg sync.WaitGroup
  349. for _, user := range users {
  350. roomSessionIdOb, found := user["sessionId"]
  351. if !found {
  352. continue
  353. }
  354. roomSessionId, ok := roomSessionIdOb.(string)
  355. if !ok {
  356. log.Printf("User %+v has invalid room session id, ignoring", user)
  357. delete(user, "sessionId")
  358. continue
  359. }
  360. if roomSessionId == sessionIdNotInMeeting {
  361. log.Printf("User %+v is not in the meeting, ignoring", user)
  362. delete(user, "sessionId")
  363. continue
  364. }
  365. wg.Add(1)
  366. go func(roomSessionId string, u map[string]interface{}) {
  367. defer wg.Done()
  368. if sessionId, err := b.lookupByRoomSessionId(roomSessionId, cache, timeout); err != nil {
  369. log.Printf("Could not lookup by room session %s: %s", roomSessionId, err)
  370. delete(u, "sessionId")
  371. } else if sessionId != "" {
  372. u["sessionId"] = sessionId
  373. } else {
  374. // sessionId == ""
  375. delete(u, "sessionId")
  376. }
  377. }(roomSessionId, user)
  378. }
  379. wg.Wait()
  380. result := make([]map[string]interface{}, 0, len(users))
  381. for _, user := range users {
  382. if _, found := user["sessionId"]; found {
  383. result = append(result, user)
  384. }
  385. }
  386. return result
  387. }
  388. func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
  389. timeout := time.Second
  390. var cache ConcurrentStringStringMap
  391. // Convert (Nextcloud) session ids to signaling session ids.
  392. request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout)
  393. // Entries in "Changed" are most likely already fetched through the "Users" list.
  394. request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout)
  395. if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 {
  396. return nil
  397. }
  398. return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
  399. }
  400. func (b *BackendServer) sendRoomParticipantsUpdate(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
  401. timeout := time.Second
  402. // Convert (Nextcloud) session ids to signaling session ids.
  403. var cache ConcurrentStringStringMap
  404. request.Participants.Users = b.fixupUserSessions(&cache, request.Participants.Users, timeout)
  405. request.Participants.Changed = b.fixupUserSessions(&cache, request.Participants.Changed, timeout)
  406. if len(request.Participants.Users) == 0 && len(request.Participants.Changed) == 0 {
  407. return nil
  408. }
  409. var wg sync.WaitGroup
  410. loop:
  411. for _, user := range request.Participants.Changed {
  412. permissionsInterface, found := user["permissions"]
  413. if !found {
  414. continue
  415. }
  416. sessionId := user["sessionId"].(string)
  417. permissionsList, ok := permissionsInterface.([]interface{})
  418. if !ok {
  419. log.Printf("Received invalid permissions %+v (%s) for session %s", permissionsInterface, reflect.TypeOf(permissionsInterface), sessionId)
  420. continue
  421. }
  422. var permissions []Permission
  423. for idx, ob := range permissionsList {
  424. permission, ok := ob.(string)
  425. if !ok {
  426. log.Printf("Received invalid permission at position %d %+v (%s) for session %s", idx, ob, reflect.TypeOf(ob), sessionId)
  427. continue loop
  428. }
  429. permissions = append(permissions, Permission(permission))
  430. }
  431. wg.Add(1)
  432. go func(sessionId string, permissions []Permission) {
  433. defer wg.Done()
  434. message := &NatsMessage{
  435. Type: "permissions",
  436. Permissions: permissions,
  437. }
  438. if err := b.nats.Publish("session."+sessionId, message); err != nil {
  439. log.Printf("Could not send permissions update (%+v) to session %s: %s", permissions, sessionId, err)
  440. }
  441. }(sessionId, permissions)
  442. }
  443. wg.Wait()
  444. return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
  445. }
  446. func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
  447. return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
  448. }
  449. func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) {
  450. v := mux.Vars(r)
  451. roomid := v["roomid"]
  452. var backend *Backend
  453. backendUrl := r.Header.Get(HeaderBackendServer)
  454. if backendUrl != "" {
  455. if u, err := url.Parse(backendUrl); err == nil {
  456. backend = b.hub.backend.GetBackend(u)
  457. }
  458. if backend == nil {
  459. // Unknown backend URL passed, return immediately.
  460. http.Error(w, "Authentication check failed", http.StatusForbidden)
  461. return
  462. }
  463. }
  464. if backend == nil {
  465. if compatBackend := b.hub.backend.GetCompatBackend(); compatBackend != nil {
  466. // Old-style configuration using a single secret for all backends.
  467. backend = compatBackend
  468. } else {
  469. // Old-style Talk, find backend that created the checksum.
  470. // TODO(fancycode): Remove once all supported Talk versions send the backend header.
  471. for _, b := range b.hub.backend.GetBackends() {
  472. if ValidateBackendChecksum(r, body, b.Secret()) {
  473. backend = b
  474. break
  475. }
  476. }
  477. }
  478. if backend == nil {
  479. http.Error(w, "Authentication check failed", http.StatusForbidden)
  480. return
  481. }
  482. }
  483. if !ValidateBackendChecksum(r, body, backend.Secret()) {
  484. http.Error(w, "Authentication check failed", http.StatusForbidden)
  485. return
  486. }
  487. var request BackendServerRoomRequest
  488. if err := json.Unmarshal(body, &request); err != nil {
  489. log.Printf("Error decoding body %s: %s", string(body), err)
  490. http.Error(w, "Could not read body", http.StatusBadRequest)
  491. return
  492. }
  493. request.ReceivedTime = time.Now().UnixNano()
  494. var err error
  495. switch request.Type {
  496. case "invite":
  497. b.sendRoomInvite(roomid, backend, request.Invite.UserIds, request.Invite.Properties)
  498. b.sendRoomUpdate(roomid, backend, request.Invite.UserIds, request.Invite.AllUserIds, request.Invite.Properties)
  499. case "disinvite":
  500. b.sendRoomDisinvite(roomid, backend, DisinviteReasonDisinvited, request.Disinvite.UserIds, request.Disinvite.SessionIds)
  501. b.sendRoomUpdate(roomid, backend, request.Disinvite.UserIds, request.Disinvite.AllUserIds, request.Disinvite.Properties)
  502. case "update":
  503. err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request)
  504. b.sendRoomUpdate(roomid, backend, nil, request.Update.UserIds, request.Update.Properties)
  505. case "delete":
  506. err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request)
  507. b.sendRoomDisinvite(roomid, backend, DisinviteReasonDeleted, request.Delete.UserIds, nil)
  508. case "incall":
  509. err = b.sendRoomIncall(roomid, backend, &request)
  510. case "participants":
  511. err = b.sendRoomParticipantsUpdate(roomid, backend, &request)
  512. case "message":
  513. err = b.sendRoomMessage(roomid, backend, &request)
  514. default:
  515. http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest)
  516. return
  517. }
  518. if err != nil {
  519. log.Printf("Error processing %s for room %s: %s", string(body), roomid, err)
  520. http.Error(w, "Error while processing", http.StatusInternalServerError)
  521. return
  522. }
  523. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  524. w.Header().Set("X-Content-Type-Options", "nosniff")
  525. w.WriteHeader(http.StatusOK)
  526. // TODO(jojo): Return better response struct.
  527. w.Write([]byte("{}")) // nolint
  528. }
  529. func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
  530. return func(w http.ResponseWriter, r *http.Request) {
  531. addr := getRealUserIP(r)
  532. if strings.Contains(addr, ":") {
  533. if host, _, err := net.SplitHostPort(addr); err == nil {
  534. addr = host
  535. }
  536. }
  537. if !b.statsAllowedIps[addr] {
  538. http.Error(w, "Authentication check failed", http.StatusForbidden)
  539. return
  540. }
  541. f(w, r)
  542. }
  543. }
  544. func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
  545. stats := b.hub.GetStats()
  546. statsData, err := json.MarshalIndent(stats, "", " ")
  547. if err != nil {
  548. log.Printf("Could not serialize stats %+v: %s", stats, err)
  549. http.Error(w, "Internal server error", http.StatusInternalServerError)
  550. return
  551. }
  552. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  553. w.Header().Set("X-Content-Type-Options", "nosniff")
  554. w.WriteHeader(http.StatusOK)
  555. w.Write(statsData) // nolint
  556. }