来源:绘你一世倾城
链接:juejin.cn/post/6844904050580783112
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
往期惊喜:
SpringBoot + Filter 实现 Gzip 压缩超大 json 对象,传输耗时大大减少!
扫码关注我们的Java架构师技术
带你全面深入Java
大家好,我是Java架构师
1.时代的里程碑——即时通信
钉钉于2015年1月正式上线,2016年4月腾讯正式发布企业微信1.0版本,也只有简单的考勤、请假、报销等功能,在产品功能上略显平淡。彼时再看钉钉,凭借先发优势,初期就确定的产品线“讨好”老板,2016年企业数100万,2018年这个数量上升到700万,可见钉钉发展速度之快,稳固了钉钉在B端市场的地位。企业微信早期举棋不定的打法,也让它在企业OA办公上玩不过钉钉。但企业微信在发布3.0版本后,局面开始扭转,钉钉在用户数量上似乎已经饱和,难以有新的突破,而企业微信才真正开始逐渐占据市场。
2.章节概述
3.深入理解websocket协议
public function dohandshake($sock, $data, $key) {
if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $data, $match)) {
$response = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
$upgrade = "HTTP/1.1 101 Switching Protocol\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept: " . $response . "\r\n\r\n";
socket_write($sock, $upgrade, strlen($upgrade));
$this->isHand[$key] = true;
}
}
package main
import (
"net/http"
"time"
"github.com/gorilla/websocket"
)
var (
//完成握手操作
upgrade = websocket.Upgrader{
//允许跨域(一般来讲,websocket都是独立部署的)
CheckOrigin:func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
var (
conn *websocket.Conn
err error
data []byte
)
//服务端对客户端的http请求(升级为websocket协议)进行应答,应答之后,协议升级为websocket,http建立连接时的tcp三次握手将保持。
if conn, err = upgrade.Upgrade(w, r, nil); err != nil {
return
}
//启动一个协程,每隔1s向客户端发送一次心跳消息
go func() {
var (
err error
)
for {
if err = conn.WriteMessage(websocket.TextMessage, []byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
//得到websocket的长链接之后,就可以对客户端传递的数据进行操作了
for {
//通过websocket长链接读到的数据可以是text文本数据,也可以是二进制Binary
if _, data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
//出错之后,关闭socket连接
conn.Close()
}
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe("0.0.0.0:7777", nil)
}
app
│ ├── args
│ │ ├── contact.go
│ │ └── pagearg.go
│ ├── controller //控制器层,api入口
│ │ ├── chat.go
│ │ ├── contract.go
│ │ ├── upload.go
│ │ └── user.go
│ ├── main.go //程序入口
│ ├── model //数据定义与存储
│ │ ├── community.go
│ │ ├── contract.go
│ │ ├── init.go
│ │ └── user.go
│ ├── service //逻辑实现
│ │ ├── contract.go
│ │ └── user.go
│ ├── util //帮助函数
│ │ ├── md5.go
│ │ ├── parse.go
│ │ ├── resp.go
│ │ └── string.go
│ └── view //模版资源
│ │ ├── ...
asset //js、css文件
resource //上传资源,上传图片会放到这里
func registerView() {
tpl, err := template.ParseGlob("./app/view/**/*")
if err != nil {
log.Fatal(err.Error())
}
for _, v := range tpl.Templates() {
tplName := v.Name()
http.HandleFunc(tplName, func(writer http.ResponseWriter, request *http.Request) {
tpl.ExecuteTemplate(writer, tplName, nil)
})
}
}
...
func main() {
......
http.Handle("/asset/", http.FileServer(http.Dir(".")))
http.Handle("/resource/", http.FileServer(http.Dir(".")))
registerView()
log.Fatal(http.ListenAndServe(":8081", nil))
}
user
表来存储用户信息。我们使用github.com/go-xorm/xorm
来操作mysql
,首先看一下mysql
表的设计:package model
import "time"
const (
SexWomen = "W"
SexMan = "M"
SexUnknown = "U"
)
type User struct {
Id int64 `xorm:"pk autoincr bigint(64)" form:"id" json:"id"`
Mobile string `xorm:"varchar(20)" form:"mobile" json:"mobile"`
Passwd string `xorm:"varchar(40)" form:"passwd" json:"-"` // 用户密码 md5(passwd + salt)
Avatar string `xorm:"varchar(150)" form:"avatar" json:"avatar"`
Sex string `xorm:"varchar(2)" form:"sex" json:"sex"`
Nickname string `xorm:"varchar(20)" form:"nickname" json:"nickname"`
Salt string `xorm:"varchar(10)" form:"salt" json:"-"`
Online int `xorm:"int(10)" form:"online" json:"online"` //是否在线
Token string `xorm:"varchar(40)" form:"token" json:"token"` //用户鉴权
Memo string `xorm:"varchar(140)" form:"memo" json:"memo"`
Createat time.Time `xorm:"datetime" form:"createat" json:"createat"` //创建时间, 统计用户增量时使用
}
user
表中存储了用户名、密码、头像、用户性别、手机号等一些重要的信息,比较重要的是我们也存储了token标示用户在用户登录之后,http协议升级为websocket协议进行鉴权,这个细节点我们前边提到过,下边会有代码演示。接下来我们看一下model初始化要做的一些事情吧:package model
import (
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
"log"
)
var DbEngine *xorm.Engine
func init() {
driverName := "mysql"
dsnName := "root:root@(127.0.0.1:3306)/chat?charset=utf8"
err := errors.New("")
DbEngine, err = xorm.NewEngine(driverName, dsnName)
if err != nil && err.Error() != ""{
log.Fatal(err)
}
DbEngine.ShowSQL(true)
//设置数据库连接数
DbEngine.SetMaxOpenConns(10)
//自动创建数据库
DbEngine.Sync(new(User), new(Community), new(Contact))
fmt.Println("init database ok!")
}
############################
//app/controller/user.go
############################
......
//用户注册
func UserRegister(writer http.ResponseWriter, request *http.Request) {
var user model.User
util.Bind(request, &user)
user, err := UserService.UserRegister(user.Mobile, user.Passwd, user.Nickname, user.Avatar, user.Sex)
if err != nil {
util.RespFail(writer, err.Error())
} else {
util.RespOk(writer, user, "")
}
}
......
############################
//app/service/user.go
############################
......
type UserService struct{}
//用户注册
func (s *UserService) UserRegister(mobile, plainPwd, nickname, avatar, sex string) (user model.User, err error) {
registerUser := model.User{}
_, err = model.DbEngine.Where("mobile=? ", mobile).Get(®isterUser)
if err != nil {
return registerUser, err
}
//如果用户已经注册,返回错误信息
if registerUser.Id > 0 {
return registerUser, errors.New("该手机号已注册")
}
registerUser.Mobile = mobile
registerUser.Avatar = avatar
registerUser.Nickname = nickname
registerUser.Sex = sex
registerUser.Salt = fmt.Sprintf("%06d", rand.Int31n(10000))
registerUser.Passwd = util.MakePasswd(plainPwd, registerUser.Salt)
registerUser.Createat = time.Now()
//插入用户信息
_, err = model.DbEngine.InsertOne(®isterUser)
return registerUser, err
}
......
############################
//main.go
############################
......
func main() {
http.HandleFunc("/user/register", controller.UserRegister)
}
util.Bind(request, &user)
将用户参数绑定到user对象上,使用的是util包中的Bind函数,具体实现细节读者可以自行研究,主要模仿了Gin框架的参数绑定,可以拿来即用,非常方便。然后我们根据用户手机号搜索数据库中是否已经存在,如果不存在就插入到数据库中,返回注册成功信息,逻辑非常简单。登录逻辑更简单:############################
//app/controller/user.go
############################
...
//用户登录
func UserLogin(writer http.ResponseWriter, request *http.Request) {
request.ParseForm()
mobile := request.PostForm.Get("mobile")
plainpwd := request.PostForm.Get("passwd")
//校验参数
if len(mobile) == 0 || len(plainpwd) == 0 {
util.RespFail(writer, "用户名或密码不正确")
}
loginUser, err := UserService.Login(mobile, plainpwd)
if err != nil {
util.RespFail(writer, err.Error())
} else {
util.RespOk(writer, loginUser, "")
}
}
...
############################
//app/service/user.go
############################
...
func (s *UserService) Login(mobile, plainpwd string) (user model.User, err error) {
//数据库操作
loginUser := model.User{}
model.DbEngine.Where("mobile = ?", mobile).Get(&loginUser)
if loginUser.Id == 0 {
return loginUser, errors.New("用户不存在")
}
//判断密码是否正确
if !util.ValidatePasswd(plainpwd, loginUser.Salt, loginUser.Passwd) {
return loginUser, errors.New("密码不正确")
}
//刷新用户登录的token值
token := util.GenRandomStr(32)
loginUser.Token = token
model.DbEngine.ID(loginUser.Id).Cols("token").Update(&loginUser)
//返回新用户信息
return loginUser, nil
}
...
############################
//main.go
############################
......
func main() {
http.HandleFunc("/user/login", controller.UserLogin)
}
############################
//app/controller/chat.go
############################
......
//本核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
......
//userid和Node映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node, 0)
//读写锁
var rwlocker sync.RWMutex
//实现聊天的功能
func Chat(writer http.ResponseWriter, request *http.Request) {
query := request.URL.Query()
id := query.Get("id")
token := query.Get("token")
userId, _ := strconv.ParseInt(id, 10, 64)
//校验token是否合法
islegal := checkToken(userId, token)
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return islegal
},
}).Upgrade(writer, request, nil)
if err != nil {
log.Println(err.Error())
return
}
//获得websocket链接conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//获取用户全部群Id
comIds := concatService.SearchComunityIds(userId)
for _, v := range comIds {
node.GroupSets.Add(v)
}
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
//开启协程处理发送逻辑
go sendproc(node)
//开启协程完成接收逻辑
go recvproc(node)
sendMsg(userId, []byte("welcome!"))
}
......
//校验token是否合法
func checkToken(userId int64, token string) bool {
user := UserService.Find(userId)
return user.Token == token
}
......
############################
//main.go
############################
......
func main() {
http.HandleFunc("/chat", controller.Chat)
}
......
/chat
的GET请求,服务端首先创建了一个Node
结构体,用来存储和客户端建立起来的websocket长连接句柄,每一个句柄都有一个管道DataQueue
,用来收发信息,GroupSets
是客户端对应的群组信息,后边我们会提到。type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
Node
关联起来://userid和Node映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node, 0)
//校验token是否合法
islegal := checkToken(userId, token)
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return islegal
},
}).Upgrade(writer, request, nil)
Node
,搜索该客户端用户所在的群组id
,填充到群组的GroupSets
属性中。然后将Node
节点添加到ClientMap
中维护起来,我们对ClientMap
的操作一定要加锁,因为Go语言在并发情况下,对map的操作并不保证原子安全://获得websocket链接conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//获取用户全部群Id
comIds := concatService.SearchComunityIds(userId)
for _, v := range comIds {
node.GroupSets.Add(v)
}
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
......
//开启协程处理发送逻辑
go sendproc(node)
//开启协程完成接收逻辑
go recvproc(node)
sendMsg(userId, []byte("welcome!"))
......
############################
//app/controller/chat.go
############################
type Message struct {
Id int64 `json:"id,omitempty" form:"id"` //消息ID
Userid int64 `json:"userid,omitempty" form:"userid"` //谁发的
Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
Dstid int64 `json:"dstid,omitempty" form:"dstid"` //对端用户ID/群ID
Media int `json:"media,omitempty" form:"media"` //消息按照什么样式展示
Content string `json:"content,omitempty" form:"content"` //消息的内容
Pic string `json:"pic,omitempty" form:"pic"` //预览图片
Url string `json:"url,omitempty" form:"url"` //服务的URL
Memo string `json:"memo,omitempty" form:"memo"` //简单描述
Amount int `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
id
,将来我们可以对消息持久化存储,但是我们系统中并没有做这件工作,读者可根据需要自行完成。然后是userid
,发起消息的用户,对应的是dstid
,要将消息发送给谁。还有一个参数非常重要,就是cmd
,它表示是群聊还是私聊,群聊和私聊的代码处理逻辑有所区别,我们为此专门定义了一些cmd
常量://定义命令行格式
const (
CmdSingleMsg = 10
CmdRoomMsg = 11
CmdHeart = 0
)
media
是媒体类型,我们都知道微信支持语音、视频和各种其他的文件传输,我们设置了该参数之后,读者也可以自行拓展这些功能。content
是消息文本,是聊天中最常用的一种形式。pic
和url
是为图片和其他链接资源所设置的。memo
是简介,amount
是和数字相关的信息,比如说发红包业务有可能使用到该字段。############################
//app/controller/chat.go
############################
......
//发送逻辑
func sendproc(node *Node) {
for {
select {
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
//接收逻辑
func recvproc(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
log.Println(err.Error())
return
}
dispatch(data)
//todo对data进一步处理
fmt.Printf("recv<=%s", data)
}
}
......
//后端调度逻辑处理
func dispatch(data []byte) {
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
log.Println(err.Error())
return
}
switch msg.Cmd {
case CmdSingleMsg:
sendMsg(msg.Dstid, data)
case CmdRoomMsg:
for _, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) {
v.DataQueue <- data
}
}
case CmdHeart:
//检测客户端的心跳
}
}
//发送消息,发送到消息的管道
func sendMsg(userId int64, msg []byte) {
rwlocker.RLock()
node, ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
node.DataQueue <- msg
}
}
......
Node
的channel中去就好了。通过websocket的WriteMessage
就可以实现此功能:func sendproc(node *Node) {
for {
select {
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
ReadMessage
方法接收到用户信息,然后通过dispatch
方法进行调度:func recvproc(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
log.Println(err.Error())
return
}
dispatch(data)
//todo对data进一步处理
fmt.Printf("recv<=%s", data)
}
}
dispatch
方法所做的工作有两件:dispatch
只要将消息添加到channel当中,发送协程就会获取到信息发送给客户端,这样就实现了聊天功能,单聊和群聊的区别只是服务端将消息发送给群组还是个人,如果发送给群组,程序会遍历整个clientMap
, 看看哪个用户在这个群组当中,然后将消息发送。其实更好的实践是我们再维护一个群组和用户关系的Map,这样在发送群组消息的时候,取得用户信息就比遍历整个clientMap
代价要小很多了。func dispatch(data []byte) {
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
log.Println(err.Error())
return
}
switch msg.Cmd {
case CmdSingleMsg:
sendMsg(msg.Dstid, data)
case CmdRoomMsg:
for _, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) {
v.DataQueue <- data
}
}
case CmdHeart:
//检测客户端的心跳
}
}
......
func sendMsg(userId int64, msg []byte) {
rwlocker.RLock()
node, ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
node.DataQueue <- msg
}
}
{
"dstid":1,
"cmd":10,
"userid":2,
"media":4,
"url":"/asset/plugins/doutu//emoj/2.gif"
}
############################
//app/controller/upload.go
############################
func init() {
os.MkdirAll("./resource", os.ModePerm)
}
func FileUpload(writer http.ResponseWriter, request *http.Request) {
UploadLocal(writer, request)
}
//将文件存储在本地/im_resource目录下
func UploadLocal(writer http.ResponseWriter, request *http.Request) {
//获得上传源文件
srcFile, head, err := request.FormFile("file")
if err != nil {
util.RespFail(writer, err.Error())
}
//创建一个新的文件
suffix := ".png"
srcFilename := head.Filename
splitMsg := strings.Split(srcFilename, ".")
if len(splitMsg) > 1 {
suffix = "." + splitMsg[len(splitMsg)-1]
}
filetype := request.FormValue("filetype")
if len(filetype) > 0 {
suffix = filetype
}
filename := fmt.Sprintf("%d%s%s", time.Now().Unix(), util.GenRandomStr(32), suffix)
//创建文件
filepath := "./resource/" + filename
dstfile, err := os.Create(filepath)
if err != nil {
util.RespFail(writer, err.Error())
return
}
//将源文件拷贝到新文件
_, err = io.Copy(dstfile, srcFile)
if err != nil {
util.RespFail(writer, err.Error())
return
}
util.RespOk(writer, filepath, "")
}
......
############################
//main.go
############################
func main() {
http.HandleFunc("/attach/upload", controller.FileUpload)
}
5. 程序优化和系统架构升级方案
clientMap
来存储客户端长链接信息,Go语言中对于大Map的读写要加锁,有一定的性能限制,在用户量特别大的情况下,读者可以对clientMap
做拆分,根据用户id做hash或者采用其他的策略,也可以将这些长链接句柄存放到redis中。6.结束语
在 GitHub猿 还有更多优质项目系统学习资源,欢迎分享给其他同学吧!
最后,整理了400多套项目,赠送读者。扫码下方二维码,后台回复【赚钱】即可获取。
--END--
来源:绘你一世倾城
链接:juejin.cn/post/6844904050580783112
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
往期惊喜:
SpringBoot + Filter 实现 Gzip 压缩超大 json 对象,传输耗时大大减少!
扫码关注我们的Java架构师技术
带你全面深入Java