2小时快速搭建一个高可用的IM系统
笔者 2019 年参加了一次 Gopher 大会,有幸听探探的架构师分享了他们 2019 年微服务化的过程。
图片来自 Pexels
本文的目的是帮助读者较为深入的理解 Socket 协议,并快速搭建一个高可用、可拓展的 IM 系统(文章标题纯属引人眼球,不是真的,请读者不要在意),同时帮助读者了解 IM 系统后续可以做哪些优化和改进。
麻雀虽小,五脏俱全,该 IM 系统包含基本的注册、登录、添加好友基础功能,另外提供单聊、群聊,并且支持发送文字、表情和图片,在搭建的系统上,读者可轻松的拓展语音、视频聊天、发红包等业务。
为了帮助读者更清楚的理解 IM 系统的原理:
第一节深入讲解 WebSocket 协议,WebSocket 是长链接中比较常用的协议。
第二节讲解快速搭建 IM 系统的技巧和主要代码实现。
第三节对 IM 系统的架构升级和优化提出一些建议和思路。
最后章节做本文的回顾总结。
深入理解 WebSocket 协议
Web Sockets 的目标是在一个单独的持久连接上提供全双工、双向通信。在 Javascript 创建了 WebSocket 之后,会有一个 HTTP 请求发送到浏览器以发起连接。
在取得服务器响应后,建立的连接会将 HTTP 升级从 HTTP 协议交换为 WebSocket 协议。
由于 WebSocket 使用自定义的协议,所以 URL 模式也略有不同。未加密的连接不再是 http://,而是 ws://;加密的连接也不是 https://,而是 wss://。
在使用 WebSocket URL 时,必须带着这个模式,因为将来还有可能支持其他的模式。
使用自定义协议而非 HTTP 协议的好处是,能够在客户端和服务器之间发送非常少量的数据,而不必担心 HTTP 那样字节级的开销。由于传递的数据包很小,所以 WebSocket 非常适合移动应用。
上文中只是对 Web Sockets 进行了笼统的描述,接下来的篇幅会对 Web Sockets 的细节实现进行深入的探索。
本文接下来的几个小节不会涉及到大量的代码片段,但是会对相关的 API 和技术原理进行分析,相信大家读完下文之后再来看这段描述,会有一种豁然开朗的感觉。
①WebSocket 复用了 HTTP 的握手通道
“握手通道”是 HTTP 协议中客户端和服务端通过"TCP 三次握手"建立的通信通道。
客户端和服务端使用 HTTP 协议进行的每次交互都需要先建立这样一条“通道”,然后通过这条通道进行通信。
我们熟悉的 Ajax 交互就是在这样一个通道上完成数据传输的,只不过 Ajax 交互是短连接,在一次 Request→Response 之后,“通道”连接就断开了。
上文中我们提到:在 Javascript 创建了 WebSocket 之后,会有一个 HTTP 请求发送到浏览器以发起连接,然后服务端响应,这就是“握手“的过程。
在这个握手的过程当中,客户端和服务端主要做了两件事情:
说完了握手通道,我们再来看 HTTP 协议如何升级到 WebSocket 协议的。
②HTTP 协议升级为 WebSocket 协议
升级协议需要客户端和服务端交流,服务端怎么知道要将 HTTP 协议升级到 WebSocket 协议呢?它一定是接收到了客户端发送过来的某种信号。
首先,客户端发起协议升级请求。采用的是标准的 HTTP 报文格式,且只支持 GET 方法。
下面是重点请求的首部的意义:
将 Sec-WebSocket-Key 跟全局唯一的(GUID,[RFC4122])标识:258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接。
通过 SHA1 计算出摘要,并转成 base64 字符串。
我们还是用世界上最好的语言来描述一下这个算法吧:
public function dohandshake($sock, $data, $key) {
if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $data, $match)) {
$response = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
$upgrade = "HTTP/1.1 101 Switching Protocol\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept: " . $response . "\r\n\r\n";
socket_write($sock, $upgrade, strlen($upgrade));
$this->isHand[$key] = true;
}
}
服务端响应客户端的头部信息和 HTTP 协议的格式是相同的,HTTP1.1 协议是以换行符(\r\n)分割的,我们可以通过正则匹配解析出 Sec-WebSocket-Accept 的值,这和我们使用 curl 工具模拟 get 请求是一个道理。
从图中可以看到,通过算法算出来的 base64 字符串和 Sec-WebSocket-Accept 是一样的。
那么假如服务端在握手的过程中返回一个错误的 Sec-WebSocket-Accept 字符串会怎么样呢?
当然是客户端会报错,连接会建立失败,大家可以尝试一下,例如将全局唯一标识符 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 改为 258EAFA5-E914-47DA-95CA-C5AB0DC85B12。
③WebSocket 的帧和数据分片传输
可以看到一篇足足有将近 15000 字节的数据在客户端和服务端完成通信只用了 150ms 的时间。
我们还可以看到浏览器控制台中 Frame 栏中显示的客户端发送和服务端响应的文本数据,你一定惊讶 WebSocket 通信强大的数据传输能力。
数据是否真的像 Frame 中展示的那样客户端直接将一大篇文本数据发送到服务端,服务端接收到数据之后,再将一大篇文本数据返回给客户端呢?
这当然是不可能的,我们都知道 HTTP 协议是基于 TCP 实现的,HTTP 发送数据也是分包转发的,就是将大数据根据报文形式分割成一小块一小块发送到服务端,服务端接收到客户端发送的报文后,再将小块的数据拼接组装。
关于 HTTP 的分包策略,大家可以查看相关资料进行研究,WebSocket 协议也是通过分片打包数据进行转发的,不过策略上和 HTTP 的分包不一样。
报文内容中规定了数据标示,操作代码、掩码、数据、数据长度等格式。不太理解没关系,下面我通过讲解大家只要理解报文中重要标志的作用就可以了。
首先我们明白了客户端和服务端进行 WebSocket 消息传递是这样的:
客户端:将消息切割成多个帧,并发送给服务端。
服务端:接收消息帧,并将关联的帧重新组装成完整的消息。
服务端在接收到客户端发送的帧消息的时候,将这些帧进行组装,它怎么知道何时数据组装完成的呢?
这就是报文中左上角 FIN(占一个比特)存储的信息,1 表示这是消息的最后一个分片(fragment)如果是 0,表示不是消息的最后一个分片。
WebSocket 通信中,客户端发送数据分片是有序的,这一点和 HTTP 不一样。
HTTP 将消息分包之后,是并发无序的发送给服务端的,包信息在数据中的位置则在 HTTP 报文中存储,而 WebSocket 仅仅需要一个 FIN 比特位就能保证将数据完整的发送到服务端。
接下来的 RSV1,RSV2,RSV3 三个比特位的作用又是什么呢?这三个标志位是留给客户端开发者和服务端开发者开发过程中协商进行拓展的,默认是 0。
拓展如何使用必须在握手的阶段就协商好,其实握手本身也是客户端和服务端的协商。
④WebSocket 连接保持和心跳检测
WebSocket 是长连接,为了保持客户端和服务端的实时双向通信,需要确保客户端和服务端之间的 TCP 通道保持连接没有断开。
但是对于长时间没有数据往来的连接,如果依旧保持着,可能会浪费服务端资源。
不排除有些场景,客户端和服务端虽然长时间没有数据往来,仍然需要保持连接,就比如说你几个月没有和一个 QQ 好友聊天了,突然有一天他发 QQ 消息告诉你他要结婚了,你还是能在第一时间收到。
那是因为,客户端和服务端一直再采用心跳来检查连接。客户端和服务端的心跳连接检测就像打乒乓球一样:
发送方→接收方:ping
接收方→发送方:pong
等什么时候没有 ping、pong 了,那么连接一定是存在问题了。
说了这么多,接下来我使用 Go 语言来实现一个心跳检测,WebSocket 通信实现细节是一件繁琐的事情,直接使用开源的类库是比较不错的选择,我使用的是:gorilla/websocket。
package main
import (
"net/http"
"time"
"github.com/gorilla/websocket"
)
var (
//完成握手操作
upgrade = websocket.Upgrader{
//允许跨域(一般来讲,websocket都是独立部署的)
CheckOrigin:func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
var (
conn *websocket.Conn
err error
data []byte
)
//服务端对客户端的http请求(升级为websocket协议)进行应答,应答之后,协议升级为websocket,http建立连接时的tcp三次握手将保持。
if conn, err = upgrade.Upgrade(w, r, nil); err != nil {
return
}
//启动一个协程,每隔1s向客户端发送一次心跳消息
go func() {
var (
err error
)
for {
if err = conn.WriteMessage(websocket.TextMessage, []byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
//得到websocket的长链接之后,就可以对客户端传递的数据进行操作了
for {
//通过websocket长链接读到的数据可以是text文本数据,也可以是二进制Binary
if _, data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
//出错之后,关闭socket连接
conn.Close()
}
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe("0.0.0.0:7777", nil)
}
借助 Go 语言很容易搭建协程的特点,我专门开启了一个协程每秒向客户端发送一条消息。
大家对 WebSocket 协议已经有了了解,接下来就让我们一起快速搭建一个高性能、可拓展的 IM 系统吧。
快速搭建高性能、可拓展的 IM 系统
①系统架构和代码文件目录结构
我们本节 C 端使用的是 Webapp, 通过 Go 语言渲染 Vue 模版快速实现功能,接入层使用的是 WebSocket 协议,前边已经进行了深入的介绍。
S 端是我们实现的重点,其中鉴权、登录、关系管理、单聊和群聊的功能都已经实现,读者可以在这部分功能的基础上再拓展其他的功能,比如:视频语音聊天、发红包、朋友圈等业务模块。
存储层我们做的比较简单,只是使用 MySQL 简单持久化存储了用户关系,然后聊天中的图片资源我们存储到了本地文件中。
虽然我们的 IM 系统实现的比较简化,但是读者可以在次基础上进行改进、完善、拓展,依然能够作出高可用的企业级产品。
我们的系统服务使用 Go 语言构建,代码结构比较简洁,但是性能比较优秀(这是 Java 和其他语言所无法比拟的),单机支持几万人的在线聊天。
app
│ ├── args
│ │ ├── contact.go
│ │ └── pagearg.go
│ ├── controller //控制器层,api入口
│ │ ├── chat.go
│ │ ├── contract.go
│ │ ├── upload.go
│ │ └── user.go
│ ├── main.go //程序入口
│ ├── model //数据定义与存储
│ │ ├── community.go
│ │ ├── contract.go
│ │ ├── init.go
│ │ └── user.go
│ ├── service //逻辑实现
│ │ ├── contract.go
│ │ └── user.go
│ ├── util //帮助函数
│ │ ├── md5.go
│ │ ├── parse.go
│ │ ├── resp.go
│ │ └── string.go
│ └── view //模版资源
│ │ ├── ...
asset //js、css文件
resource //上传资源,上传图片会放到这里
从入口函数 main.go 开始,我们定义了 Controller 层,是客户端 API 的入口。Service 用来处理主要的用户逻辑,消息分发、用户管理都在这里实现。
Model 层定义了一些数据表,主要是用户注册和用户好友关系、群组等信息,存储到 MySQL。
Util 包下是一些帮助函数,比如加密、请求响应等。View 下边存储了模版资源信息,上边所说的这些都在 App 文件夹下存储,外层还有 asset 用来存储 css、js 文件和聊天中会用到的表情图片等。
Resource 下存储用户聊天中的图片或者视频等文件。总体来讲,我们的代码目录机构还是比较简洁清晰的。
了解了我们要搭建的 IM 系统架构,我们再来看一下架构重点实现的功能吧。
②10 行代码万能模版渲染
func registerView() {
tpl, err := template.ParseGlob("./app/view/**/*")
if err != nil {
log.Fatal(err.Error())
}
for _, v := range tpl.Templates() {
tplName := v.Name()
http.HandleFunc(tplName, func(writer http.ResponseWriter, request *http.Request) {
tpl.ExecuteTemplate(writer, tplName, nil)
})
}
}
...
func main() {
......
http.Handle("/asset/", http.FileServer(http.Dir(".")))
http.Handle("/resource/", http.FileServer(http.Dir(".")))
registerView()
log.Fatal(http.ListenAndServe(":8081", nil))
}
Go 实现静态资源服务器也很简单,只需要调用 http.FileServer 就可以了,这样 HTML 文件就可以很轻松的访问依赖的 js、css 和图标文件了。
使用 http/template 包下的 ParseGlob、ExecuteTemplate 又可以很轻松的解析 Web 页面,这些工作完全不依赖与 Nginx。
③注册、登录和鉴权
之前我们提到过,对于注册、登录和好友关系管理,我们需要有一张 user 表来存储用户信息。
我们使用 github.com/go-xorm/xorm 来操作 MySQL,首先看一下 MySQL 表的设计:
package model
import "time"
const (
SexWomen = "W"
SexMan = "M"
SexUnknown = "U"
)
type User struct {
Id int64 `xorm:"pk autoincr bigint(64)" form:"id" json:"id"`
Mobile string `xorm:"varchar(20)" form:"mobile" json:"mobile"`
Passwd string `xorm:"varchar(40)" form:"passwd" json:"-"` // 用户密码 md5(passwd + salt)
Avatar string `xorm:"varchar(150)" form:"avatar" json:"avatar"`
Sex string `xorm:"varchar(2)" form:"sex" json:"sex"`
Nickname string `xorm:"varchar(20)" form:"nickname" json:"nickname"`
Salt string `xorm:"varchar(10)" form:"salt" json:"-"`
Online int `xorm:"int(10)" form:"online" json:"online"` //是否在线
Token string `xorm:"varchar(40)" form:"token" json:"token"` //用户鉴权
Memo string `xorm:"varchar(140)" form:"memo" json:"memo"`
Createat time.Time `xorm:"datetime" form:"createat" json:"createat"` //创建时间, 统计用户增量时使用
}
我们 user 表中存储了用户名、密码、头像、用户性别、手机号等一些重要的信息,比较重要的是我们也存储了 Token 标示用户在用户登录之后,HTTP 协议升级为 WebSocket 协议进行鉴权,这个细节点我们前边提到过,下边会有代码演示。
接下来我们看一下 model 初始化要做的一些事情吧:
package model
import (
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
"log"
)
var DbEngine *xorm.Engine
func init() {
driverName := "mysql"
dsnName := "root:root@(127.0.0.1:3306)/chat?charset=utf8"
err := errors.New("")
DbEngine, err = xorm.NewEngine(driverName, dsnName)
if err != nil && err.Error() != ""{
log.Fatal(err)
}
DbEngine.ShowSQL(true)
//设置数据库连接数
DbEngine.SetMaxOpenConns(10)
//自动创建数据库
DbEngine.Sync(new(User), new(Community), new(Contact))
fmt.Println("init database ok!")
}
我们创建一个 DbEngine 全局 MySQL 连接对象,设置了一个大小为 10 的连接池。
Model 包里的 init 函数在程序加载的时候会先执行,对 Go 语言熟悉的同学应该知道这一点。
我们还设置了一些额外的参数用于调试程序,比如:设置打印运行中的 SQL,自动的同步数据表等,这些功能在生产环境中可以关闭。
我们的 Model 初始化工作就做完了,非常简陋,在实际的项目中,像数据库的用户名、密码、连接数和其他的配置信息,建议设置到配置文件中,然后读取,而不像本文硬编码的程序中。
############################
//app/controller/user.go
############################
......
//用户注册
func UserRegister(writer http.ResponseWriter, request *http.Request) {
var user model.User
util.Bind(request, &user)
user, err := UserService.UserRegister(user.Mobile, user.Passwd, user.Nickname, user.Avatar, user.Sex)
if err != nil {
util.RespFail(writer, err.Error())
} else {
util.RespOk(writer, user, "")
}
}
......
############################
//app/service/user.go
############################
......
type UserService struct{}
//用户注册
func (s *UserService) UserRegister(mobile, plainPwd, nickname, avatar, sex string) (user model.User, err error) {
registerUser := model.User{}
_, err = model.DbEngine.Where("mobile=? ", mobile).Get(®isterUser)
if err != nil {
return registerUser, err
}
//如果用户已经注册,返回错误信息
if registerUser.Id > 0 {
return registerUser, errors.New("该手机号已注册")
}
registerUser.Mobile = mobile
registerUser.Avatar = avatar
registerUser.Nickname = nickname
registerUser.Sex = sex
registerUser.Salt = fmt.Sprintf("%06d", rand.Int31n(10000))
registerUser.Passwd = util.MakePasswd(plainPwd, registerUser.Salt)
registerUser.Createat = time.Now()
//插入用户信息
_, err = model.DbEngine.InsertOne(®isterUser)
return registerUser, err
}
......
############################
//main.go
############################
......
func main() {
http.HandleFunc("/user/register", controller.UserRegister)
}
首先我们使用 util.Bind(request, &user) 将用户参数绑定到 user 对象上,使用的是 util 包中的 Bind 函数,具体实现细节读者可以自行研究,主要模仿了 Gin 框架的参数绑定,可以拿来即用,非常方便。
然后我们根据用户手机号搜索数据库中是否已经存在,如果不存在就插入到数据库中,返回注册成功信息,逻辑非常简单。
############################
//app/controller/user.go
############################
...
//用户登录
func UserLogin(writer http.ResponseWriter, request *http.Request) {
request.ParseForm()
mobile := request.PostForm.Get("mobile")
plainpwd := request.PostForm.Get("passwd")
//校验参数
if len(mobile) == 0 || len(plainpwd) == 0 {
util.RespFail(writer, "用户名或密码不正确")
}
loginUser, err := UserService.Login(mobile, plainpwd)
if err != nil {
util.RespFail(writer, err.Error())
} else {
util.RespOk(writer, loginUser, "")
}
}
...
############################
//app/service/user.go
############################
...
func (s *UserService) Login(mobile, plainpwd string) (user model.User, err error) {
//数据库操作
loginUser := model.User{}
model.DbEngine.Where("mobile = ?", mobile).Get(&loginUser)
if loginUser.Id == 0 {
return loginUser, errors.New("用户不存在")
}
//判断密码是否正确
if !util.ValidatePasswd(plainpwd, loginUser.Salt, loginUser.Passwd) {
return loginUser, errors.New("密码不正确")
}
//刷新用户登录的token值
token := util.GenRandomStr(32)
loginUser.Token = token
model.DbEngine.ID(loginUser.Id).Cols("token").Update(&loginUser)
//返回新用户信息
return loginUser, nil
}
...
############################
//main.go
############################
......
func main() {
http.HandleFunc("/user/login", controller.UserLogin)
}
实现了登录逻辑,接下来我们就到了用户首页,这里列出了用户列表,点击即可进入聊天页面。
用户也可以点击下边的 Tab 栏查看自己所在的群组,可以由此进入群组聊天页面。
具体这些工作还需要读者自己开发用户列表、添加好友、创建群组、添加群组等功能,这些都是一些普通的 API 开发工作,我们的代码程序中也实现了,读者可以拿去修改使用,这里就不再演示了。
我们再重点看一下用户鉴权这一块吧,用户鉴权是指用户点击聊天进入聊天界面时,客户端会发送一个 GET 请求给服务端。
请求建立一条 WebSocket 长连接,服务端收到建立连接的请求之后,会对客户端请求进行校验,以确实是否建立长连接,然后将这条长连接的句柄添加到 Map 当中(因为服务端不仅仅对一个客户端服务,可能存在千千万万个长连接)维护起来。
############################
//app/controller/chat.go
############################
......
//本核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
......
//userid和Node映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node, 0)
//读写锁
var rwlocker sync.RWMutex
//实现聊天的功能
func Chat(writer http.ResponseWriter, request *http.Request) {
query := request.URL.Query()
id := query.Get("id")
token := query.Get("token")
userId, _ := strconv.ParseInt(id, 10, 64)
//校验token是否合法
islegal := checkToken(userId, token)
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return islegal
},
}).Upgrade(writer, request, nil)
if err != nil {
log.Println(err.Error())
return
}
//获得websocket链接conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//获取用户全部群Id
comIds := concatService.SearchComunityIds(userId)
for _, v := range comIds {
node.GroupSets.Add(v)
}
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
//开启协程处理发送逻辑
go sendproc(node)
//开启协程完成接收逻辑
go recvproc(node)
sendMsg(userId, []byte("welcome!"))
}
......
//校验token是否合法
func checkToken(userId int64, token string) bool {
user := UserService.Find(userId)
return user.Token == token
}
......
############################
//main.go
############################
......
func main() {
http.HandleFunc("/chat", controller.Chat)
}
......
进入聊天室,客户端发起 /chat 的 GET 请求,服务端首先创建了一个 Node 结构体,用来存储和客户端建立起来的 WebSocket 长连接句柄。
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
服务端创建了一个 Map,将客户端用户 ID 和其 Node 关联起来:
//userid和Node映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node, 0)
//校验token是否合法
islegal := checkToken(userId, token)
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return islegal
},
}).Upgrade(writer, request, nil)
鉴权成功以后,服务端初始化一个 Node,搜索该客户端用户所在的群组 ID,填充到群组的 GroupSets 属性中。
//获得websocket链接conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//获取用户全部群Id
comIds := concatService.SearchComunityIds(userId)
for _, v := range comIds {
node.GroupSets.Add(v)
}
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
服务端和客户端建立了长链接之后,会开启两个协程专门来处理客户端消息的收发工作,对于 Go 语言来说,维护协程的代价是很低的。
......
//开启协程处理发送逻辑
go sendproc(node)
//开启协程完成接收逻辑
go recvproc(node)
sendMsg(userId, []byte("welcome!"))
......
至此,我们的鉴权工作也已经完成了,客户端和服务端的连接已经建立好了,接下来我们就来实现具体的聊天功能吧。
④实现单聊和群聊
实现聊天的过程中,消息体的设计至关重要,消息体设计的合理,功能拓展起来就非常的方便,后期维护、优化起来也比较简单。
############################
//app/controller/chat.go
############################
type Message struct {
Id int64 `json:"id,omitempty" form:"id"` //消息ID
Userid int64 `json:"userid,omitempty" form:"userid"` //谁发的
Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
Dstid int64 `json:"dstid,omitempty" form:"dstid"` //对端用户ID/群ID
Media int `json:"media,omitempty" form:"media"` //消息按照什么样式展示
Content string `json:"content,omitempty" form:"content"` //消息的内容
Pic string `json:"pic,omitempty" form:"pic"` //预览图片
Url string `json:"url,omitempty" form:"url"` //服务的URL
Memo string `json:"memo,omitempty" form:"memo"` //简单描述
Amount int `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
每一条消息都有一个唯一的 ID,将来我们可以对消息持久化存储,但是我们系统中并没有做这件工作,读者可根据需要自行完成。
群聊和私聊的代码处理逻辑有所区别,我们为此专门定义了一些 cmd 常量:
//定义命令行格式
const (
CmdSingleMsg = 10
CmdRoomMsg = 11
CmdHeart = 0
)
Media 是媒体类型,我们都知道微信支持语音、视频和各种其他的文件传输,我们设置了该参数之后,读者也可以自行拓展这些功能。
Content 是消息文本,是聊天中最常用的一种形式。Pic 和 URL 是为图片和其他链接资源所设置的。
Memo 是简介,Amount 是和数字相关的信息,比如说发红包业务有可能使用到该字段。
消息体的设计就是这样,基于此消息体,我们来看一下,服务端如何收发消息,实现单聊和群聊吧。
############################
//app/controller/chat.go
############################
......
//发送逻辑
func sendproc(node *Node) {
for {
select {
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
//接收逻辑
func recvproc(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
log.Println(err.Error())
return
}
dispatch(data)
//todo对data进一步处理
fmt.Printf("recv<=%s", data)
}
}
......
//后端调度逻辑处理
func dispatch(data []byte) {
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
log.Println(err.Error())
return
}
switch msg.Cmd {
case CmdSingleMsg:
sendMsg(msg.Dstid, data)
case CmdRoomMsg:
for _, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) {
v.DataQueue <- data
}
}
case CmdHeart:
//检测客户端的心跳
}
}
//发送消息,发送到消息的管道
func sendMsg(userId int64, msg []byte) {
rwlocker.RLock()
node, ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
node.DataQueue <- msg
}
}
......
服务端向客户端发送消息逻辑比较简单,就是将客户端发送过来的消息,直接添加到目标用户 Node 的 Channel 中去就好了。
func sendproc(node *Node) {
for {
select {
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
func recvproc(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
log.Println(err.Error())
return
}
dispatch(data)
//todo对data进一步处理
fmt.Printf("recv<=%s", data)
}
}
dispatch 方法所做的工作有两件:
解析消息体到 Message 中。
根据消息类型,将消息体添加到不同用户或者用户组的 Channel 当中。
Go 语言中的 Channel 是协程间通信的强大工具,dispatch 只要将消息添加到 Channel 当中,发送协程就会获取到信息发送给客户端,这样就实现了聊天功能。
单聊和群聊的区别只是服务端将消息发送给群组还是个人,如果发送给群组,程序会遍历整个 clientMap,看看哪个用户在这个群组当中,然后将消息发送。
func dispatch(data []byte) {
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
log.Println(err.Error())
return
}
switch msg.Cmd {
case CmdSingleMsg:
sendMsg(msg.Dstid, data)
case CmdRoomMsg:
for _, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) {
v.DataQueue <- data
}
}
case CmdHeart:
//检测客户端的心跳
}
}
......
func sendMsg(userId int64, msg []byte) {
rwlocker.RLock()
node, ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
node.DataQueue <- msg
}
}
可以看到,通过 Channel,我们实现用户聊天功能还是非常方便的,代码可读性很强,构建的程序也很健壮。
⑤发送表情和图片
下边我们再来看一下聊天中经常使用到的发送表情和图片功能是如何实现的吧。
其实表情也是小图片,只是和聊天中图片不同的是,表情图片比较小,可以缓存在客户端,或者直接存放到客户端代码的代码文件中(不过现在微信聊天中有的表情包都是通过网络传输的)。
{
"dstid":1,
"cmd":10,
"userid":2,
"media":4,
"url":"/asset/plugins/doutu//emoj/2.gif"
}
客户端拿到 URL 后,就加载本地的小图标。聊天中用户发送图片也是一样的原理,不过聊天中用户的图片需要先上传到服务器,然后服务端返回 URL,客户端再进行加载,我们的 IM 系统也支持此功能。
############################
//app/controller/upload.go
############################
func init() {
os.MkdirAll("./resource", os.ModePerm)
}
func FileUpload(writer http.ResponseWriter, request *http.Request) {
UploadLocal(writer, request)
}
//将文件存储在本地/im_resource目录下
func UploadLocal(writer http.ResponseWriter, request *http.Request) {
//获得上传源文件
srcFile, head, err := request.FormFile("file")
if err != nil {
util.RespFail(writer, err.Error())
}
//创建一个新的文件
suffix := ".png"
srcFilename := head.Filename
splitMsg := strings.Split(srcFilename, ".")
if len(splitMsg) > 1 {
suffix = "." + splitMsg[len(splitMsg)-1]
}
filetype := request.FormValue("filetype")
if len(filetype) > 0 {
suffix = filetype
}
filename := fmt.Sprintf("%d%s%s", time.Now().Unix(), util.GenRandomStr(32), suffix)
//创建文件
filepath := "./resource/" + filename
dstfile, err := os.Create(filepath)
if err != nil {
util.RespFail(writer, err.Error())
return
}
//将源文件拷贝到新文件
_, err = io.Copy(dstfile, srcFile)
if err != nil {
util.RespFail(writer, err.Error())
return
}
util.RespOk(writer, filepath, "")
}
......
############################
//main.go
############################
func main() {
http.HandleFunc("/attach/upload", controller.FileUpload)
}
我们将文件存放到本地的一个磁盘文件夹下,然后发送给客户端路径,客户端通过路径加载相关的图片信息。
关于发送图片,我们虽然实现功能,但是做的太简单了,我们在接下来的章节详细的和大家探讨一下系统优化相关的方案。怎样让我们的系统在生产环境中用的更好。
程序优化和系统架构升级方案
我们上边实现了一个功能健全的 IM 系统,要将该系统应用在企业的生产环境中,需要对代码和系统架构做优化,才能实现真正的高可用。
本节主要从代码优化和架构升级上谈一些个人观点,能力有限不可能面面俱到,希望读者也在评论区给出更多好的建议。
代码优化
我们的代码没有使用框架,函数和 API 都写的比较简陋,虽然进行了简单的结构化,但是很多逻辑并没有解耦,所以建议大家业界比较成熟的框架对代码进行重构,Gin 就是一个不错的选择。
系统程序中使用 clientMap 来存储客户端长链接信息,Go 语言中对于大 Map 的读写要加锁,有一定的性能限制。
在用户量特别大的情况下,读者可以对 clientMap 做拆分,根据用户 ID 做 Hash 或者采用其他的策略,也可以将这些长链接句柄存放到 Redis 中。
上边提到图片上传的过程,有很多可以优化的地方,首先是图片压缩(微信也是这样做的),图片资源的压缩不仅可以加快传输速度,还可以减少服务端存储的空间。
另外对于图片资源来说,实际上服务端只需要存储一份数据就够了,读者可以在图片上传的时候做 Hash 校验。
如果资源文件已经存在了,就不需要再次上传了,而是直接将 URL 返回给客户端(各大网盘厂商的妙传功能就是这样实现的)。
代码还有很多优化的地方,比如我们可以将鉴权做的更好,使用 wss:// 代替 ws://。
在一些安全领域,可以对消息体进行加密,在高并发领域,可以对消息体进行压缩。
对 MySQL 连接池再做优化,将消息持久化存储到 Mongo,避免对数据库频繁的写入,将单条写入改为多条一块写入;为了使程序耗费更少的 CPU,降低对消息体进行 Json 编码的次数,一次编码,多次使用......
系统架构升级
我们的系统太过于简单,所在在架构升级上,有太多的工作可以做,笔者在这里只提几点比较重要的:
我们所说的资源指的是图片、视频等文件,可以选择成熟厂商的 Cos,或者自己搭建文件服务器也是可以的,如果资源量比较大,用户比较广,CDN 是不错的选择。
对于服务器的选择,一般会选择 Linux,Linux 下一切皆文件,长链接也是一样。
单机的系统连接数是有限制的,一般来说能达到 10 万就很不错了,所以在用户量增长到一定程序,需要搭建分布式。
分布式的搭建就要优化程序,因为长链接句柄分散到不同的机器,实现消息广播和分发是首先要解决的问题,笔者这里不深入阐述了,一来是没有足够的经验,二来是解决方案有太多的细节需要探讨。
搭建分布式环境所面临的问题还有:怎样更好的弹性扩容、应对突发事件等。
我们上边将用户注册、添加好友等功能和聊天功能放到了一起,真实的业务场景中可以将它们做分离,将用户注册、添加好友、创建群组放到一台服务器上,将聊天功能放到另外的服务器上。
业务的分离不仅使功能逻辑更加清晰,还能更有效的利用服务器资源。
我们的系统没有将消息持久化,用户信息持久化到 MySQL 中去。
在业务当中,如果要对消息做持久化储存,就要考虑数据库 I/O 的优化,简单讲:合并数据库的写次数、优化数据库的读操作、合理的利用缓存。
上边是就是笔者想到的一些代码优化和架构升级的方案。
结束语
不知道大家有没有发现,使用 Go 搭建一个 IM 系统比使用其他语言要简单很多,而且具备更好的拓展性和性能(并没有吹嘘 Go 的意思)。
在当今这个时代,5G 将要普及,流量不再昂贵,IM 系统已经广泛渗入到了用户日常生活中。
对于程序员来说,搭建一个 IM 系统不再是困难的事情,如果读者根据本文的思路,理解 WebSocket,Copy 代码,运行程序,应该用不了半天的时间就能上手这样一个 IM 系统。
IM 系统是一个时代,从 QQ、微信到现在的人工智能,都广泛应用了即时通信,围绕即时通信,又可以做更多产品布局。
笔者写本文的目的就是想要帮助更多人了解 IM,帮助一些开发者快速的搭建一个应用,燃起大家学习网络编程知识的兴趣,希望的读者能有所收获,能将 IM 系统应用到更多的产品布局中。
https://github.com/GuoZhaoran/fastIM
作者:绘你一世倾城
编辑:陶家龙
出处:https://juejin.im/post/5e1b29366fb9a02fc31dda24
精彩文章推荐: