查看原文
其他

大话ion系列(一)

LiveVideoStack 2021-09-19

点击上方“LiveVideoStack”关注我们

作者 | 王朋闯
本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。


一、为什么用ion-sfu


1.简


ion-sfu作为ion分布式架构里的核心模块,SFU是选择转发单元的简称,可以分发WebRTC的媒体流。ion-sfu从pion/ion拆分出来,经过社区打磨,是目前GO方案中最成熟且使用最广的SFU。

https://github.com/pion/ion


已经有多家开始商用了,这点国外公司比较快,比如:100ms、Screenleap和Tandem等。


100ms:https://www.100ms.live/
Screenleap:https://www.screenleap.com/
Tandem:https://tandem.chat/


2.ion-sfu优点

  • 纯GO,开发效率高,且能帮你绕过很多坑


  • 单进程多协程模型:
    - 可以利用多核
    大大降低级联/单端口复杂度(其他SFU,可能存在本机不同worker间relay的问题;监听单端口时,存在worker间抢包的问题)

  • 高并发,曾在谷歌云4核压测到单房间50方会议 (大概2500路流-0.5Mbps)


  • 功能全面:

    - 双PeerConnection+多Track设计,有良好的浏览器兼容性,节省系统资
    支持多对多音视频通信
    支持大小流Simulcast
    支持幕分享Screenshare
    支持发言方自动检测Audio-Level-Detect
    支持定制DataChannel
    节点间Relay
    支持单端口,大大降低部署难度
    善的抗弱网机制,抗丢包40%左右,支持TWCC/REMB+PLI/FIR+Nack+SR/RR等


  • 配套SDK完善,JS/Flutter/GO等


3.使用方式


ion-sfu使用方式有两种:


  • 作为服务使用,比如编译带grpc或jsonrpc信令的ion-sfu,然后再做一个自己的信令服务(推荐ion分布式套装),远程调用即可。
  • 作为包使用,import导入,然后做二次开发。此时抛弃了cmd下边的信令层,只需导入pkg/sfu下边的包即可,然后自行定制信令层,可以在sfu、session、peer层面,通过继承接口定制自己的业务,比较复杂。

import ( sfu "github.com/pion/ion-sfu/pkg/sfu")


二、架构与模块



上面给一个简单架构图,很多细节表示不出来,需要看代码。


1.简介


得益于GO,ion-sfu整体代码精简,拥有极高的开发效率。结合现有SDK使用,可以避免很多坑:ion-sdk-js等。


ion-sfu基于pion/webrtc,所以代码风格偏标准webrtc,比如:PeerConnection。因为是使用了标准API,熟悉了之后很容易看懂其他工程,比如:ion-sdk-go/js/flutter。


这样从前到后,整体门槛都降低了。


2.工程组织


这里给出主要模块列表:

├── Makefile //用来编译二进制和grpc文件├── bin //编译好的二进制目录├── cmd│ └── signal //包含三个主文件 grpc、jsonrpc、allrpc├── config.toml //配置文件├── examples //网页示例目录├── pkg ├── buffer //buffer包,用于缓存包 ├── logger //日志 ├── middlewares //中间件,主要是支持自定义datachannel ├── relay //中继 ├── sfu //sfu主模块,包含router、session、peer等 ├── stats //状态统计 └── twcc //transport-cc


3.信令层


信令代码和主程序在一起,在cmd/signal/下边。


  • 支持jsonrpc,主要处理逻辑在:

func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {

  • 支持grpc,主要处理逻辑在:

func (s *SFUServer) Signal(stream pb.SFU_SignalServer) error {

而allrpc,是jsonrpc和grpc的合体封装,运行时会进入上面两个函数。


信令很简单:

  • join:加入一个session。
  • description:发起offer或回复answer,用于协商和重协商。
  • trickle:发送trickle candidate。


另外,出于简单考虑,一些信令和事件,直接走datachannel了,比如:大小流切换、声音检测、自定义信令等。


4.媒体层


媒体层的主要模块:

├── audioobserver.go //声音检测├── datachannel.go //dc中间件的封装├── downtrack.go //下行track├── helpers.go //工具函数集├── mediaengine.go //SDP相关codec、rtp参数设置├── peer.go //peer封装,一个peer包含一个publisher和一个subscriber,双pc设计├── publisher.go //publisher,封装上行pc├── receiver.go //subscriber,封装下行pc├── router.go //router,包含pc、session、一组receivers├── sequencer.go //记录包的信息:序列号sn、偏移、时间戳ts等├── session.go //会话,包含多个peer、dc├── sfu.go //分发单元,包含多个session、dc├── simulcast.go //大小流配置├── subscriber.go //subscriber,封装下行pc、DownTrack、dc└── turn.go //内置turn server

相比以前版本,增加了一些interface,主要是为了作为包使用时,封装自己的类。


三、主函数与信令流程


1.主函数


这里拿jsonrpc来分析,其他rpc流程上是一样的。


func main() { if !parse() { showHelp() os.Exit(-1) }

//创建SFU和DC,这里的DC用于Simulcast和AudioLevel s := sfu.NewSFU(conf) dc := s.NewDatachannel(sfu.APIChannelLabel) dc.Use(datachannel.SubscriberAPI)

//接下来是标准websocket服务器启动的流程 upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, ReadBufferSize: 1024, WriteBufferSize: 1024, }

//这里jsonrpc基于websocket,websocket从标准http upgrade过来的 http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil { panic(err) } defer c.Close()

//这里创建了JSONSignal,每次真实请求到来时会新建一个Peer,进入Handle函数处理 p := server.NewJSONSignal(sfu.NewPeer(s), logger) defer p.Close()

jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p) <-< span="">jc.DisconnectNotify() }))

go startMetrics(metricsAddr)

var err error if key != "" && cert != "" { logger.Info("Started listening", "addr", "https://"+addr) err = http.ListenAndServeTLS(addr, cert, key, nil) } else { logger.Info("Started listening", "addr", "http://"+addr) err = http.ListenAndServe(addr, nil) } if err != nil { panic(err) }}

2.协商&重协商


协商(negotiate):

WebRTC对外的类是PeerConnection,简称PC,通过信令服务交换SDP给PC进行操作。协商就是指双方通过信令交换SDP,通过PC的一些接口,达到协商双方的媒体格式、传输地址端口等信息,从而实现推流和播放的目的。


一次协商完整流程:

本端CreateOffer-》本端SetLocalDescription(offer)-》本端发送offer-》对端SetRemoteDescription(offer)-》对端CreateAnswer-》SetLocalDescription(answer)-》对端对端返回answer-》本端SetRemoteDescription(answer)


重协商(renegotiate):

就是指再次协商。


为什么要重协商?

因为客户端和服务器的track都是变化的,重协商是通知对端的必要手段,比如:客户端发起屏幕分享,服务器有人进出房间等。


重协商的原则:

谁变化谁发起(offer)。


3.信令流程


首先,客户端ws连接成功:

服务端会建立一个Peer,可以参考上边代码。


然后,客户端发起第一次协商:

客户端pc.CreateOffer(一个只包含dc的offer)-》pc.SetLocalDescription(offer),然后把offer放入Join信令,发送给服务端,然后服务器协商【pc.SetRemoteDescription(offer)-》pc.CreateAnswer-》pc.SetLocalDescription(answer)】,返回answer给客户端,至此完成数据通道(datachannel)建立。首先打通dc,是为了方便audio-level/simulcast通道的建立,此时也可以创建自定dc做定制业务。


接下来,服务端发起第二次协商:

服务端pc.CreateOffer,SetLocalDescription,发送offer,此时offer会携带上此房间内的所有track信息,客户端收到后会CreateAnswer,SetLocalDescription,把answer返回来,然后服务端pc.SetRemoteDescription(answer),此时客户端可以收到服务器此房间内的所有流了。


最后,客户端publish发流时会发起第三次协商:

同第一次流程一样,不同的是同时携带了音视频的track,本次协商完成后,服务器可以收到客户端的流了,收到之后会对同房间内的其他客户端发起重协商。



往后只要客户端或服务器track有变化,都会再次发起重协商。


4.代码分析


JsonRPC所有的信令都会进入Handle函数。为了简化流程,可以暂时不看Trickle和OnIceCandidate函数,这个是开启trickle-ICE时才会有。


// Handle incoming RPC call events like join, answer, offer and trickle// JSONSignal是继承了LocalPeer,所以会继承一些属性和回调:OnOffer等。// 可以在浏览器端ws网络工具里查看具体信令内容func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { replyError := func(err error) { _ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ Code: 500, Message: fmt.Sprintf("%s", err), }) }

switch req.Method { case "join":// 首先客户端会发join信令过来 var join Join err := json.Unmarshal(*req.Params, &join) if err != nil { p.Logger.Error(err, "connect: error parsing offer") replyError(err) break }

//设置OnOffer,即SFU发起offer时(重协商),会使用这个回调,比如重协商时,因为有很多客户端peer同时连到SFU,每个Peer的Track增删时,SFU需要向其他Peer重协商来告诉Track的变更 p.OnOffer = func(offer *webrtc.SessionDescription) { if err := conn.Notify(ctx, "offer", offer); err != nil { p.Logger.Error(err, "error sending offer") }

}

//设置OnIceCandidate,即SFU在ICE流程获取到新候选时,会回调这个函数,告诉客户端新增了啥候选 p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) { if err := conn.Notify(ctx, "trickle", Trickle{ Candidate: *candidate, Target: target, }); err != nil { p.Logger.Error(err, "error sending ice candidate") } } //加入某个会话(房间) err = p.Join(join.SID, join.UID, join.Config) if err != nil { replyError(err) break }

//根据offer回复answer answer, err := p.Answer(join.Offer) if err != nil { replyError(err) break }

_ = conn.Reply(ctx, req.ID, answer)

//如果是客户端发offer,回复answer,此时为客户端发起重协商 case "offer": var negotiation Negotiation err := json.Unmarshal(*req.Params, &negotiation) if err != nil { p.Logger.Error(err, "connect: error parsing offer") replyError(err) break }

answer, err := p.Answer(negotiation.Desc) if err != nil { replyError(err) break } _ = conn.Reply(ctx, req.ID, answer)

//如果是客户端发answer,设置SetRemoteDescription即可 case "answer": var negotiation Negotiation err := json.Unmarshal(*req.Params, &negotiation) if err != nil { p.Logger.Error(err, "connect: error parsing offer") replyError(err) break }

err = p.SetRemoteDescription(negotiation.Desc) if err != nil { replyError(err) }

//如果是客户端发送Trickle-ICE的候选过来,设置即可 case "trickle": var trickle Trickle err := json.Unmarshal(*req.Params, &trickle) if err != nil { p.Logger.Error(err, "connect: error parsing candidate") replyError(err) break }

err = p.Trickle(trickle.Candidate, trickle.Target) if err != nil { replyError(err) } }}

注意OnOffer是服务器重协商的回调,即房间内某客户端track有变化,服务器会回调此函数通知其他客户端。


总结一句话,客户端《---》SFU的核心逻辑就是不断重协商,谁变化谁发起offer




作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。


特别说明:

本文发布于知乎,已获得作者授权转载。



扫描图中二维码或点击阅读原文
了解大会更多信息

喜欢我们的内容就点个“在看”吧!

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存