直播间的弹幕、礼物、关注、分享、点赞等信息都由该信息流通道传输, 本文将对该信息流进行解析, 以便于开发者更好的使用该信息流.
0. 引言
信息流使用websocket
的二进制编码进行传输, 本文使用golang
作为代码实例, 其他语言的实现请自行实现
大体结构分为:
- 获取
websocket
的url
和key
- 连接
websocket
- 发送认证消息
- 接收信息流
- 每
30s
发送心跳包
其中, 每个上下行的包都由头部 + 数据
组成, 其中头部
的结构如下:
type WsHeader struct {
PackageLen uint32 // 包长度
HeaderLen uint16 // 头部长度
ProtoVer uint16 // 协议版本
OpCode uint32 // 操作码
Sequence uint32 // 序列号 (没什么用)
}
二进制包, 需要将uint
转为[]byte
进行传输
其中, 协议版本如下: | 协议版本 | 说明 |
---|---|---|
0 | 普通包正文不使用压缩 | |
1 | 心跳及认证包正文不使用压缩 | |
2 | 普通包正文使用zlib压缩 | |
3 | 普通包正文使用brotli压缩 |
其中, 操作码如下: | 代码 | 含义 |
---|---|---|
2 | 心跳包 | |
3 | 心跳包回复(人气值) | |
5 | 普通包(命令) | |
7 | 认证包 | |
8 | 认证包回复 |
准备工作
先写一个编码解码的函数:
func uint32ToByte4(num uint32) []byte {
if num > 4294967295 {
return nil
}
var bytesNum = make([]byte, 4)
for i := 3; i >= 0; i-- {
bytesNum[i] = byte(num % 256)
num /= 256
}
return bytesNum
}
func uint16ToByte2(num uint16) []byte {
if num > 65535 {
return nil
}
var bytesNum = make([]byte, 2)
for i := 1; i >= 0; i-- {
bytesNum[i] = byte(num % 256)
num /= 256
}
return bytesNum
}
func byte2ToUint16(bytesNum []byte) uint16 {
var num uint16
for i := 0; i < 2; i++ {
num *= 256
num += uint16(bytesNum[i])
}
return num
}
func byte4ToUint32(bytesNum []byte) uint32 {
var num uint32
for i := 0; i < 4; i++ {
num *= 256
num += uint32(bytesNum[i])
}
return num
}
定义几个操作码
const (
OpError = 1
OpHeartBeat = 2
OpHeartBeatReply = 3
OpCmd = 5
OpAuth = 7
OpAuthReply = 8
)
const (
CmdProto = 0
AuthProto = 1
HeartBeatProto = 1
CmdZlibProto = 2
CmdBrotliProto = 3
)
再基于上文的WsHeader
结构体, 写一个编码头部的方法:
func (wsHeaer *WsHeader) HeaderEncoder(bodyLen uint32) []byte {
var buffer bytes.Buffer
buffer.Write(uint32ToByte4(bodyLen + 16))
buffer.Write(uint16ToByte2(16))
buffer.Write(uint16ToByte2(wsHeaer.ProtoVer))
buffer.Write(uint32ToByte4(wsHeaer.OpCode))
buffer.Write(uint32ToByte4(wsHeaer.Sequence))
return buffer.Bytes()
}
编码的写出来了, 再写一个解码的吧
func WsHeaderDecoder(headerBytes []byte) WsHeader {
if len(headerBytes) < 16 {
return WsHeader{OpCode: OpError}
}
var wsHeader WsHeader
wsHeader.PackageLen = byte4ToUint32(headerBytes[0:4])
wsHeader.HeaderLen = byte2ToUint16(headerBytes[4:6])
wsHeader.ProtoVer = byte2ToUint16(headerBytes[6:8])
wsHeader.OpCode = byte4ToUint32(headerBytes[8:12])
wsHeader.Sequence = byte4ToUint32(headerBytes[12:16])
return wsHeader
}
准备工作完成, 开始正式的websocket
连接
1. 获取websocket
的url
和key
https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo
方法: GET
参数: | 参数名 | 类型 | 说明 |
---|---|---|---|
id | int | room id |
返回值如下:
type ApiLiveAuth struct {
Code int `json:"code"`
Message string `json:"message"`
TTL int `json:"ttl"`
Data struct {
Group string `json:"group"`
BusinessID int `json:"business_id"`
RefreshRowFactor float64 `json:"refresh_row_factor"`
RefreshRate int `json:"refresh_rate"`
MaxDelay int `json:"max_delay"`
Token string `json:"token"`
HostList []struct {
Host string `json:"host"`
Port int `json:"port"`
WssPort int `json:"wss_port"`
WsPort int `json:"ws_port"`
} `json:"host_list"`
} `json:"data"`
}
每个字段的解释如下:
字段 | 类型 | 内容 | 备注 |
---|---|---|---|
code | num | 返回值 | 0:成功 |
message | str | 错误信息 | 默认为空 |
ttl | num | 1 | 未知 |
data | obj | 信息本体 |
其中, data
的内容如下:
字段 | 类型 | 内容 | 备注 |
---|---|---|---|
group | str | live | |
business_id | num | 0 | |
refresh_row_factor | num | 0.125 | |
refresh_rate | num | 100 | |
max_delay | num | 5000 | |
token | str | 认证秘钥 | |
host_list | array | 信息流服务器节点列表 |
host_list
数组中的字段:
字段 | 类型 | 内容 | 备注 |
---|---|---|---|
host | str | 服务器域名 | |
port | num | tcp端口 | |
wss_port | num | wss端口 | |
ws_port | num | ws端口 |
我们选取host_list
中的第某个节点, 并将wss_port
作为端口号, 连接websocket
服务器
示例代码:
func getLiveRoomAuth(c.RoomId) ApiLiveAuth {
// 就一个get请求, 用http库就行了, 我就不写了
}
2. 连接至节点
wss://<host>:<wss_port>/sub
首先构造一个客户端结构体
type Client struct {
RoomId int
Connected bool
connect *websocket.Conn
revMsg chan []byte
}
多余字段先不用管, 之后会用到
由于可能会出现websocket
连接失败的情况, 我们可以对多个节点轮流连接
示例代码:
func (c *Client) biliChatConnect(url string) error {
err := errors.New("")
c.connect, _, err = websocket.DefaultDialer.Dial(url, nil)
if nil != err {
log.Println("")
return err
}
return nil
}
func (c *Client) sendConnect() error {
apiLiveAuth, err := GetLiveRoomAuth(c.RoomId)
if err != nil || apiLiveAuth.Code != 0 {
log.Printf("Get live room auth failed, not use key")
return err
}
for nowSum, i := range apiLiveAuth.Data.HostList {
u := url.URL{Scheme: "wss", Host: i.Host + ":" + strconv.Itoa(i.WssPort), Path: "/sub"}
err = c.biliChatConnect(u.String())
if err != nil {
if nowSum == 2 {
log.Println("Connect to all server failed")
return err
}
log.Println("Connect to bili chat failed, i'll try again")
} else {
break
}
}
return nil
}
3. 发送认证包
认证包为二进制包, 头部内容为我们上文提到的头部, 其中正文内容为格式化后的json
, json
内容如下:
字段 | 类型 | 内容 | 必要性 | 备注 |
---|---|---|---|---|
uid | num | 用户mid | 非必要 | uid为0即为游客登录 |
roomid | num | 加入房间的id | 必要 | 直播间真实id |
protover | num | 协议版本 | 非必要 | 3 |
platform | str | 平台标识 | 非必要 | "web" |
type | num | 2 | 非必要 | |
key | str | 认证秘钥 | 非必要 |
因此, 我们可以构造一个如下的struct
正文:
type WsAuthBody struct {
UID int `json:"uid"`
Roomid int `json:"roomid"`
Protover int `json:"protover"`
Platform string `json:"platform"`
Type int `json:"type"`
Key string `json:"key"`
}
和一个认证包结构体:
type WsAuthMessage struct {
WsHeader WsHeader
Body WsAuthBody
}
顺理成章, 写一个获取头部[]byte
的方法:
func (wsAuth *WsAuthBody) getAuthBytes() []byte {
authBody, err := json.Marshal(wsAuth)
if err != nil {
log.Printf("Marshal auth body failed: %v", err)
return []byte{}
}
return authBody
}
func (wsAuth *WsAuthMessage) GetPackage() []byte {
wsAuth.WsHeader.OpCode = OpAuth
wsAuth.WsHeader.Sequence = 1
wsAuth.WsHeader.ProtoVer = AuthProto
var buffer bytes.Buffer
authBody := wsAuth.Body.getAuthBytes()
buffer.Write(wsAuth.WsHeader.HeaderEncoder(uint32(len(authBody))))
buffer.Write(authBody)
return buffer.Bytes()
}
那么,认证过程的代码就为:
func (c *Client) sendAuthMsg(wsAuthMsg WsAuthMessage) error {
wsPackage := wsAuthMsg.GetPackage()
err := c.connect.WriteMessage(websocket.BinaryMessage, wsPackage)
if err != nil {
log.Printf("Send auth msg failed: %v\n", err)
return err
}
return nil
}
我们可以把他结合到上文的发送连接的代码中:
func (c *Client) sendConnect() error {
wsAuthMsg := WsAuthMessage{Body: WsAuthBody{UID: 0, Roomid: c.RoomId, Protover: 3, Platform: "web", Type: 2}}
apiLiveAuth, err := GetLiveRoomAuth(c.RoomId)
if err != nil || apiLiveAuth.Code != 0 {
log.Printf("Get live room auth failed, not use key")
return err
}
wsAuthMsg.Body.Key = apiLiveAuth.Data.Token
for nowSum, i := range apiLiveAuth.Data.HostList {
u := url.URL{Scheme: "wss", Host: i.Host + ":" + strconv.Itoa(i.WssPort), Path: "/sub"}
err = c.biliChatConnect(u.String())
if err != nil {
if nowSum == 2 {
log.Println("Connect to all server failed")
return err
}
log.Println("Connect to bili chat failed, i'll try again")
} else {
break
}
}
err = c.sendAuthMsg(wsAuthMsg)
if err != nil {
log.Println("Send auth msg failed")
return err
}
return nil
}
4. 发送心跳包
服务端的逻辑为, 30s
未发送心跳包, 则自动断开
心跳包的正文可以是任意值, 因此心跳包的结构体可以为:
type WsHeartBeatMessage struct {
WsHeader WsHeader
Body []byte
}
代码为
func (wsHeartBeat *WsHeartBeatMessage) GetPackage() []byte {
wsHeartBeat.WsHeader.OpCode = OpHeartBeat
wsHeartBeat.WsHeader.Sequence = 1
wsHeartBeat.WsHeader.ProtoVer = HeartBeatProto
var buffer bytes.Buffer
buffer.Write(wsHeartBeat.WsHeader.HeaderEncoder(0))
buffer.Write(wsHeartBeat.Body)
return buffer.Bytes()
}
每隔30s
发送一次心跳包:
func (c *Client) heartBeat() {
for {
heartBeatPackage := WsHeartBeatMessage{Body: []byte{}}
err := c.connect.WriteMessage(websocket.TextMessage, heartBeatPackage.GetPackage())
if err != nil {
log.Printf("Send heart beat failed %v", err)
}
time.Sleep(30 * time.Second)
}
}
开启一个goroutine
来执行心跳包的发送:
go c.heartBeat()
5. 接收消息
接受消息就很简单了, 为了防止阻塞, 我们使用goroutine
来处理消息, 传入chan []byte
中
func (c *Client) receiveWsMsg() {
for {
_, message, err := c.connect.ReadMessage()
if err != nil {
log.Println("read:", err)
}
c.revMsg <- message
}
}
6. 防止连接断开 & 重复连接
为了防止返回的多个服务器都链接不成功, 我们可以做一个连接循环
func (c *Client) connectLoop() {
for {
c.Connected = false
err := c.sendConnect()
if err != nil {
log.Println("Send connect failed")
time.Sleep(5 * time.Second)
} else {
c.Connected = true
break
}
}
}
同时, 在接收消息时, 如果接收失败, 则开启重连、
func (c *Client) receiveWsMsg() {
for {
_, message, err := c.connect.ReadMessage()
if err != nil {
log.Println("read:", err)
c.Connected = false
c.connectLoop()
}
c.revMsg <- message
}
}
7. 解析返回消息
对于返回的消息, 有如下几种可能:
-
认证包返回值
返回值正文为
{"code":0}
-
普通命令消息
返回值正文比较复杂
-
心跳包返回值
返回值为当前房间热度+发送的心跳包正文
因此,构造出这样的处理器结构体:
type MsgHandler struct {
RoomId int
CmdChan chan map[string]string
}
其中的CmdChan
日后会用到
同时, 消息处理函数:
func (msgHandler *MsgHandler) MsgHandler(msg []byte) {
wsHeader := WsHeaderDecoder(msg)
switch wsHeader.OpCode {
case OpHeartBeatReply:
wsHeartBeatReply := WsHeartBeatReply{}
wsHeartBeatReply.SetPackage(wsHeader, msg)
case OpCmd:
// sth
case OpAuthReply:
wsAuthReplyMessage := WsAuthReplyMessage{}
wsAuthReplyMessage.SetPackage(wsHeader, msg)
case OpError:
}
}
对于心跳包和认证包的返回值, 可以构造这样的结构体:
type WsHeartBeatReply struct {
WsHeader WsHeader
Hot uint32
Msg []byte
}
type WsAuthReplyBody struct {
Code int `json:"code"`
}
type WsAuthReplyMessage struct {
WsHeader WsHeader
Body WsAuthReplyBody
}
和相应的处理函数
func (wsAuthReplyMessage *WsAuthReplyMessage) SetPackage(header WsHeader, msg []byte) {
wsAuthReplyMessage.WsHeader = header
authBody := WsAuthReplyBody{}
err := json.Unmarshal(msg[header.HeaderLen:], &authBody)
if err != nil {
log.Printf("Unmarshal auth body failed: %v", err)
return
}
wsAuthReplyMessage.Body = authBody
}
func (wsHeartBeatReply *WsHeartBeatReply) SetPackage(header WsHeader, msg []byte) {
wsHeartBeatReply.WsHeader = header
wsHeartBeatReply.Hot = byte4ToUint32(msg[header.HeaderLen : header.HeaderLen+4])
wsHeartBeatReply.Msg = msg[header.HeaderLen+4:]
}
主要的处理为普通包. 普通包的协议有3
中可能, 分别是为0
的普通包, 为2
的zlib
的正文压缩包, 为3
的正文brotli
压缩包, 由于除0
外, 正文都可能含有多条 含有单独头部的协议版本为0
的普通包, 因此我们需要对不同协议进行单独解压后, 拆分出所有的普通包.
首先, 构造出解压函数
func (msgHandler *MsgHandler) CmdBrotliProtoDecoder(wsHeader *WsHeader, msg []byte) []byte {
reader := brotli.NewReader(bytes.NewReader(msg[wsHeader.HeaderLen:wsHeader.PackageLen]))
resp, err := io.ReadAll(reader)
if err != nil {
log.Println("Brotli decode failed: ", err)
return []byte{}
}
return resp
}
func (msgHandler *MsgHandler) CmdZlibProtoDecoder(wsHeader *WsHeader, msg []byte) []byte {
var resp bytes.Buffer
w := zlib.NewWriter(&resp)
_, err := w.Write(msg[wsHeader.HeaderLen:wsHeader.PackageLen])
if err != nil {
log.Println("Zlib decode failed: ", err)
return []byte{}
}
err = w.Close()
if err != nil {
log.Println("Zlib decode failed: ", err)
return nil
}
return resp.Bytes()
}
然后, 完善上文的函数
case OpCmd:
msgBody := msg
cmdHeader := wsHeader
switch wsHeader.ProtoVer {
case CmdZlibProto:
msgBody = msgHandler.CmdZlibProtoDecoder(&wsHeader, msg)
cmdHeader = WsHeaderDecoder(msgBody)
fallthrough
case CmdBrotliProto:
msgBody = msgHandler.CmdBrotliProtoDecoder(&wsHeader, msg)
cmdHeader = WsHeaderDecoder(msgBody)
fallthrough
default:
for {
msgHandler.CmdHandler(&cmdHeader, msgBody[:int(cmdHeader.PackageLen)])
msgBody = msgBody[cmdHeader.PackageLen:]
if len(msgBody) == 0 {
break
}
cmdHeader = WsHeaderDecoder(msgBody)
}
}
此时, 每一条消息就被拆分出来了.
接下来, 我们需要对每一个普通包进行处理, 实现一个CmdHandler
函数, 并将map[string]string
通过通道传入另一个channel
中, 即CmdChan
, 进行进一步处理.
func (msgHandler *MsgHandler) CmdHandler(wsHeader *WsHeader, msg []byte) {
cmd := getCmd(msg[wsHeader.HeaderLen:wsHeader.PackageLen])
if cmd == "" {
return
}
rev := make(map[string]string)
rev["cmd"] = cmd
rev["msg"] = string(msg[wsHeader.HeaderLen:wsHeader.PackageLen])
rev["RoomId"] = strconv.Itoa(msgHandler.RoomId)
msgHandler.CmdChan <- rev
}
对于getCmd
函数, 很简单粗暴
func getCmd(msg []byte) string {
var layer = 0
for i, v := range msg {
if v == '{' || v == '[' {
layer++
} else if v == '}' || v == ']' {
layer--
} else if layer == 1 && v == '"' {
if i+7 < len(msg) && msg[i+1] == 'c' && msg[i+2] == 'm' && msg[i+3] == 'd' && msg[i+4] == '"' {
var from = i + 7
var to int
for to = from + 1; to < len(msg); to++ {
if msg[to] == '"' {
break
}
}
return string(msg[from:to])
}
}
}
return ""
}
最后, 只要将接收到的消息发送到MsgHandler
即可.
在client
结构体下实现一个接受handler
并处理消息的函数
func (c *Client) revHandler(handler MsgHandler) {
for {
select {
case msg, ok := <-c.revMsg:
if ok {
handler.MsgHandler(msg)
}
default:
time.Sleep(10 * time.Microsecond)
continue
}
}
}
其中, c.revMsg
是一个chan []byte
类型的通道, 用于接收消息, 上文已经提到过
开启一个客户端的命令也很简单:
func (c *Client) BiliChat(CmdChan chan map[string]string) {
c.connectLoop()
c.revMsg = make(chan []byte, 1)
handler := MsgHandler{RoomId: c.RoomId, CmdChan: CmdChan}
go c.revHandler(handler)
go c.receiveWsMsg()
go c.heartBeat()
}
8. 处理普通包
上一节实现了开启一个客户端, 并将普通包发送到另一个通道CmdChan
中, 现在来实现处理这个客户端
同时, 为了实现对于不同的命令, 实现不同的消息处理, 我们还需要写一个函数绑定功能, 实现事件绑定.
新建一个处理器结构体:
type Handler struct {
CmdChan chan map[string]string
DoFunc map[string]map[int][]func(event MsgEvent)
}
其中, CmdChan
为接受普通包的通道, DoFunc
为事件绑定的函数, 为一个map
, 后文详解解释.
对于普通包命令, 一共有近30
种:
const (
CmdDanmuMsg = "DANMU_MSG"
CmdSuperChatMessage = "SUPER_CHAT_MESSAGE"
CmdSuperChatMessageJpn = "SUPER_CHAT_MESSAGE_JPN"
CmdWatchedChange = "WATCHED_CHANGE"
CmdSendGift = "SEND_GIFT"
CmdOnlineRankCount = "ONLINE_RANK_COUNT"
CmdOnlineRankV2 = "ONLINE_RANK_V2"
CmdOnlineRankTop3 = "ONLINE_RANK_TOP3"
CmdLikeInfoV3Click = "LIKE_INFO_V3_CLICK"
CmdInteractWord = "INTERACT_WORD"
CmdStopLiveRoomList = "STOP_LIVE_ROOM_LIST"
CmdLikeInfoV3Update = "LIKE_INFO_V3_UPDATE"
CmdHotRankChange = "HOT_RANK_CHANGED"
CmdNoticeMsg = "NOTICE_MSG"
CmdRoomRealTimeMessageUpdate = "ROOM_REAL_TIME_MESSAGE_UPDATE"
CmdWidgetBanner = "WIDGET_BANNER"
CmdHotRankChangedV2 = "HOT_RANK_CHANGED_V2"
CmdGuardHonorThousand = "GUARD_HONOR_THOUSAND"
CmdLive = "LIVE"
CmdRoomChange = "ROOM_CHANGE"
CmdRoomBlockMsg = "ROOM_BLOCK_MSG"
CmdFullScreenSpecialEffect = "FULL_SCREEN_SPECIAL_EFFECT"
CmdCommonNoticeDanmaku = "COMMON_NOTICE_DANMAKU"
CmdTradingScore = "TRADING_SCORE"
CmdPreparing = "PREPARING"
CmdGuardBuy = "GUARD_BUY"
CmdGiftStarProcess = "GIFT_STAR_PROCESS"
CmdRoomSkinMsg = "ROOM_SKIN_MSG"
CmdEntryEffect = "ENTRY_EFFECT"
)
对于不同的命令, 我都构造了一个结构体, 如弹幕消息:
type DanMuMsg struct {
Cmd string `json:"cmd"`
Data struct {
Sender struct {
Uid int64
Name string
RoomId int64
}
Medal FansMedal
Content string
SendTimeStamp int
SendMillionTimeStamp int64
SenderEnterRoomTimeStamp int
}
}
同时, 为了方便地把消息解析为结构体, 还需要构造一个Setxxx
函数, 如构造弹幕消息结构体:
func (_ *Handler) SetDanMuMsg(msg map[string]string) {
danMu := DanMuMsg{}
danMu.Cmd = CmdDanmuMsg
danMuMsg := make(map[string]interface{}, 0)
err := json.Unmarshal([]byte(msg["msg"]), &danMuMsg)
if err != nil {
log.Printf("Unmarshal cmd json failed: %v", err)
return MsgEvent{}
}
danMu.Data.Content = danMuMsg["info"].([]interface{})[1].(string)
danMu.Data.SendTimeStamp = int(danMuMsg["info"].([]interface{})[9].(map[string]interface{})["ts"].(float64))
danMu.Data.SenderEnterRoomTimeStamp = int(danMuMsg["info"].([]interface{})[0].([]interface{})[4].(float64))
danMu.Data.SendMillionTimeStamp = int64(danMuMsg["info"].([]interface{})[0].([]interface{})[5].(float64))
danMu.Data.Sender.Uid = int64(danMuMsg["info"].([]interface{})[2].([]interface{})[0].(float64))
danMu.Data.Sender.Name = danMuMsg["info"].([]interface{})[2].([]interface{})[1].(string)
if len(danMuMsg["info"].([]interface{})[3].([]interface{})) != 0 {
danMu.Data.Medal.GuardLevel = int(danMuMsg["info"].([]interface{})[3].([]interface{})[0].(float64))
danMu.Data.Medal.MedalName = danMuMsg["info"].([]interface{})[3].([]interface{})[1].(string)
danMu.Data.Medal.TargetID = int(danMuMsg["info"].([]interface{})[3].([]interface{})[11].(float64))
danMu.Data.Medal.AnchorRoomId = int(danMuMsg["info"].([]interface{})[3].([]interface{})[3].(float64))
}
}
对于其他的所有命令, 都要做相同的内容, 这里就不一一列举了.
而对于每一个事件, 我们可以实现一个event
方法, 用于传参
type MsgEvent struct {
Cmd string
RoomId int
DanMuMsg *DanMuMsg
SuperChatMessage *SuperChatMessage
WatchedChange *WatchedChange
SendGift *SendGift
OnlineRankCount *OnlineRankCount
OnlineRankV2 *OnlineRankV2
OnlineRankTop3 *OnlineRankTop3
LikeInfoV3Click *LikeInfoV3Click
InteractWord *InteractWord
StopLiveRoomList *StopLiveRoomList
LikeInfoV3Update *LikeInfoV3Update
HotRankChange *HotRankChange
NoticeMsg *NoticeMsg
RoomRealTimeMessageUpdate *RoomRealTimeMessageUpdate
WidgetBanner *WidgetBanner
HotRankChangedV2 *HotRankChangedV2
GuardHonorThousand *GuardHonorThousand
Live *Live
RoomChange *RoomChange
RoomBlockMsg *RoomBlockMsg
FullScreenSpecialEffect *FullScreenSpecialEffect
CommonNoticeDanmaku *CommonNoticeDanmaku
TradingScore *TradingScore
Preparing *Preparing
GuardBuy *GuardBuy
GiftStarProcess *GiftStarProcess
RoomSkinMsg *RoomSkinMsg
EntryEffect *EntryEffect
}
则, 每一个Setxxx
函数, 都直接返回这个MsgEvent
即可, 对于其他不属于自身消息类型的字段, 设为nil
即可.
如弹幕消息:
func (_ *Handler) SetDanMuMsg(msg map[string]string) MsgEvent {
danMu := DanMuMsg{}
danMu.Cmd = CmdDanmuMsg
danMuMsg := make(map[string]interface{}, 0)
err := json.Unmarshal([]byte(msg["msg"]), &danMuMsg)
if err != nil {
log.Printf("Unmarshal cmd json failed: %v", err)
return MsgEvent{}
}
danMu.Data.Content = danMuMsg["info"].([]interface{})[1].(string)
danMu.Data.SendTimeStamp = int(danMuMsg["info"].([]interface{})[9].(map[string]interface{})["ts"].(float64))
danMu.Data.SenderEnterRoomTimeStamp = int(danMuMsg["info"].([]interface{})[0].([]interface{})[4].(float64))
danMu.Data.SendMillionTimeStamp = int64(danMuMsg["info"].([]interface{})[0].([]interface{})[5].(float64))
danMu.Data.Sender.Uid = int64(danMuMsg["info"].([]interface{})[2].([]interface{})[0].(float64))
danMu.Data.Sender.Name = danMuMsg["info"].([]interface{})[2].([]interface{})[1].(string)
if len(danMuMsg["info"].([]interface{})[3].([]interface{})) != 0 {
danMu.Data.Medal.GuardLevel = int(danMuMsg["info"].([]interface{})[3].([]interface{})[0].(float64))
danMu.Data.Medal.MedalName = danMuMsg["info"].([]interface{})[3].([]interface{})[1].(string)
danMu.Data.Medal.TargetID = int(danMuMsg["info"].([]interface{})[3].([]interface{})[11].(float64))
danMu.Data.Medal.AnchorRoomId = int(danMuMsg["info"].([]interface{})[3].([]interface{})[3].(float64))
}
roomId, err := strconv.Atoi(msg["RoomId"])
if err != nil {
log.Printf("Unmarshal cmd json failed: %v", err)
return MsgEvent{}
}
return MsgEvent{Cmd: CmdDanmuMsg, DanMuMsg: &danMu, RoomId: roomId}
}
下面来实现事件绑定
func (handler *Handler) AddOption(Cmd string, RoomId int, Do func(event MsgEvent)) {
if _, ok := handler.DoFunc[Cmd]; !ok {
handler.DoFunc[Cmd] = make(map[int][]func(event MsgEvent))
}
if _, ok := handler.DoFunc[Cmd][RoomId]; !ok {
handler.DoFunc[Cmd][RoomId] = make([]func(event MsgEvent), 0)
}
handler.DoFunc[Cmd][RoomId] = append(handler.DoFunc[Cmd][RoomId], Do)
}
很简单, 不多赘述
最后, 实现事件触发
func (handler *Handler) CmdHandler() {
for {
select {
case msg, ok := <-handler.CmdChan:
if ok {
setFunc := reflect.ValueOf(&Handler{}).MethodByName("Set" + CmdName[msg["cmd"]])
if setFunc.IsValid() {
res := setFunc.Call([]reflect.Value{reflect.ValueOf(msg)})
msgEvent := res[0].Interface().(MsgEvent)
if !(msgEvent.Cmd == "" || msgEvent.RoomId == 0) {
if _, ok := handler.DoFunc[msg["cmd"]]; ok {
if _, ok := handler.DoFunc[msg["cmd"]][msgEvent.RoomId]; ok {
for _, v := range handler.DoFunc[msg["cmd"]][msgEvent.RoomId] {
go v(msgEvent)
}
}
}
}
}
}
default:
time.Sleep(10 * time.Microsecond)
continue
}
}
}
9. 开启!
type Handler struct {
Handler handler.Handler
}
type LiveRoom struct {
RoomId int
Client client.Client
}
func GetNewHandler() Handler {
h := Handler{}
h.Handler.DoFunc = make(map[string]map[int][]func(event handler.MsgEvent), 0)
h.Handler.CmdChan = make(chan map[string]string, 100)
return h
}
func (h *Handler) AddOption(Cmd string, RoomId int, Do func(event handler.MsgEvent)) {
h.Handler.AddOption(Cmd, RoomId, Do)
}
func (h *Handler) AddRoom(room LiveRoom) {
room.Client.RoomId = room.RoomId
room.Client.BiliChat(h.Handler.CmdChan)
}
func (h *Handler) Run() {
h.Handler.CmdHandler()
}
只是一个封装, 就不解释了
开启直播间和事件绑定!
func main() {
h := GetNewHandler()
h.AddOption(handler.CmdDanmuMsg, 123, func(event handler.MsgEvent) {
fmt.Printf("[%v] %v: %v\n", event.RoomId, event.DanMuMsg.Data.Sender.Name, event.DanMuMsg.Data.Content)
})
h.AddRoom(LiveRoom{RoomId: 123})
h.Run()
}