yangkang2021.github.io

worker封装层的对象模型

worker有nodejs、rust、go的封装层,这里以go为例分析封装层,go与nodejs的实现基本一致

对象的创建与销毁

  1. worker退出方式:
    1. 发送worker.close指令–主动退出
    2. 收到系统信号
    3. 内部异常
    4. 主动发信号
  2. 对象退出
    1. gin run占据主线程,直到关闭才退出
  3. worker退出
    1. 创建失败直接panic(err)
    2. worker.On(“died” –> os.Exit(1)
  4. 通过close事件层次删除对象

demo的主流程

  1. demo:NewServer创建N个worker
  2. demo:server.Run() –> gin.RunTLS开启服务。 关闭程序是退出
  3. demo:ws连上:独立协程:runProtooWebSocketServer创建room(worker.CreateRouter),peer(HandleProtooConnection),transport。transport.run进入死循环
  4. demo: 处理信令handleProtooRequest
    • getRouterRtpCapabilities
    • join
    • createWebRtcTransport/connectWebRtcTransport
    • produce/closeProducer/pauseProducer/resumeProducer
    • pauseConsumer/resumeConsumer/setConsumerPreferredLayers/setConsumerPriority/requestConsumerKeyFrame
    • //这里没有consume信令,服务端直接转发
    • produceData
    • changeDisplayName …

      一 protoo项目使用步骤

  5. 用websocket.Conn创建transport
  6. 用transport,peerData创建peer:Room::CreatePeer(peerId,peerData, transport)
  7. 用Room接口管理所有人
  8. 用Peer接口与单人通讯
  9. 注意:应用层只需要管理peerData就行了。
  10. 对象模型:
    • Room
      • peers
        • transport //websocket传输通道
        • sents //发送的消息列表,等待回复
        • data //peer的数据
          type Room struct {
           IEventEmitter
           locker sync.Mutex
           logger logr.Logger
           closed bool
           peers  map[string]*Peer
          }
          type Peer struct {
           IEventEmitter
           logger    logr.Logger
           id        string
           transport Transport
           sents     sync.Map
           data      interface{}
           closed    uint32
           closeCh   chan struct{}
          }
          type Transport interface {
           eventemitter.IEventEmitter
           fmt.Stringer
           Send(data []byte) error
           Close()
           Closed() bool
           Run() error
          }
          type WebsocketTransport struct {
           eventemitter.IEventEmitter
           logger logr.Logger
           mu     sync.Mutex
           conn   *websocket.Conn
           closed uint32
          }
          

          二 mediasoup-go项目对象模型

  11. 不依赖protoo项目
  12. 仅仅是封装核心6组件:
    • worker,router,transport,producer,consumer,RtpObserver。
    • 还有WebRtcServer,dataProducer,dataConsumer。
  13. Worker 存储所有自己创建的的对象
    • channel
    • payloadChannel
    • webRtcServers
    • routers
      • transports
        • producers //单个transport下的推流
        • consumers
        • dataProducers
        • dataConsumers
      • producers //所有transport下的推流
      • rtpObservers
      • dataProducers
      • mapRouterPipeTransports
  14. RtpObserver用于rtp包的分析,用于音量和活跃用户风险。不会自动运行,需要自己添加需要监视的producer。
    type Worker struct {
     IEventEmitter
     // Worker logger.
     logger logr.Logger
     // Worker process PID.
     pid int
     // Channel instance.
     channel *Channel
     // PayloadChannel instance.
     payloadChannel *PayloadChannel
     // Closed flag.
     closed uint32
     // Custom app data.
     appData interface{}
     // WebRtcServers map
     webRtcServers sync.Map
     // Routers map.
     routers sync.Map
     // child is the worker process
     child *exec.Cmd
     // diedErr indices worker process stopped unexpectly
     diedErr error
     // waitCh notify worker process stopped expectly or not
     waitCh chan error
    
     // Observer instance.
     observer IEventEmitter
    
     cgoWorker *CgoWorker
    }
    type Router struct {
     IEventEmitter
     logger                  logr.Logger
     internal                internalData
     data                    routerData
     channel                 *Channel
     payloadChannel          *PayloadChannel
     closed                  uint32
     appData                 interface{}
     transports              sync.Map
     producers               sync.Map
     rtpObservers            sync.Map
     dataProducers           sync.Map
     mapRouterPipeTransports sync.Map
     observer                IEventEmitter
    }
    

    三 demo项目

  15. 有自己独立的room模型,对一个protoo的room
  16. 没有peer,peer所有的对象通过PeerData绑定到protoo的peer上
  17. 创建worker,根据信令操作worker内部对象
  18. 对象模型
    • Server
      • mediasoupWorkers //这里存了子对象
      • rooms
        • protooRoom //连接到protoo项目
        • mediasoupRouter //这里面也存了子对象
        • audioLevelObserver //ActiveSpeakerObserver默认没有开启 ``` type Server struct { logger zerolog.Logger locker sync.Mutex config Config rooms sync.Map mediasoupWorkers []*mediasoup.Worker nextMediasoupWorkerIdx int } type Room struct { mediasoup.IEventEmitter logger zerolog.Logger peerLockers sync.Map config Config roomId string protooRoom *protoo.Room mediasoupRouter *mediasoup.Router audioLevelObserver mediasoup.IRtpObserver bot *Bot networkThrottled bool broadcasters sync.Map closed uint32 }

type PeerData struct { locker sync.Mutex // // Not joined after a custom protoo “join” request is later received. Joined bool DisplayName string Device DeviceInfo RtpCapabilities *mediasoup.RtpCapabilities SctpCapabilities *mediasoup.SctpCapabilities

// // Have mediasoup related maps ready even before the Peer joins since we
// // allow creating Transports before joining.
transports    map[string]mediasoup.ITransport
producers     map[string]*mediasoup.Producer
consumers     map[string]*mediasoup.Consumer
dataProducers map[string]*mediasoup.DataProducer
dataConsumers map[string]*mediasoup.DataConsumer } type PeerInfo struct {
Id              string                     `json:"id,omitempty"`
DisplayName     string                     `json:"displayName,omitempty"`
Device          DeviceInfo                 `json:"device,omitempty"`
RtpCapabilities *mediasoup.RtpCapabilities `json:"rtpCapabilities,omitempty"`
Data            *PeerData                  `json:"-,omitempty"` } ```

四 worker57条控制指令

  1. worker.close
  2. worker.dump
  3. worker.getResourceUsage
  4. worker.updateSettings
  5. worker.createWebRtcServer
  6. worker.createRouter —
  7. webRtcServer.close
  8. webRtcServer.dump
  9. router.close
  10. router.dump
  11. router.createWebRtcTransport
  12. router.createWebRtcTransportWithServer
  13. router.createPlainTransport
  14. router.createPipeTransport
  15. router.createDirectTransport
  16. router.createActiveSpeakerObserver
  17. router.createAudioLevelObserver —
  18. transport.close
  19. transport.dump
  20. transport.getStats
  21. transport.connect
  22. transport.setMaxIncomingBitrate
  23. transport.setMaxOutgoingBitrate
  24. transport.restartIce
  25. transport.produce
  26. transport.consume
  27. transport.produceData
  28. transport.consumeData
  29. transport.enableTraceEvent —
  30. producer.close
  31. producer.dump
  32. producer.getStats
  33. producer.pause
  34. producer.resume
  35. producer.enableTraceEvent —
  36. consumer.close
  37. consumer.dump
  38. consumer.getStats
  39. consumer.pause
  40. consumer.resume
  41. consumer.setPreferredLayers
  42. consumer.setPriority
  43. consumer.requestKeyFrame
  44. consumer.enableTraceEvent —
  45. dataProducer.close
  46. dataProducer.dump
  47. dataProducer.getStats —
  48. dataConsumer.close
  49. dataConsumer.dump
  50. dataConsumer.getStats
  51. dataConsumer.getBufferedAmount
  52. dataConsumer.setBufferedAmountLowThreshold —
  53. rtpObserver.close
  54. rtpObserver.pause
  55. rtpObserver.resume
  56. rtpObserver.addProducer
  57. rtpObserver.removeProducer —