长连网关作为与客户端的直接交互通道,其稳定性要求不言而喻。
作为一个网关,如果频繁的上版本,那么可能在上线期间导致大批量的用户瞬间掉线,而需要转移到其它长连服务器上建连。正常来说,机器一台一台甚至一组一组上,一台机器上用户连接因上线原因被清退,转而跑到另外一台长连服务器建连,而此时这台新的长连服务器也刚好进入上线队列中,再次将这台长连服务器上的用户清退,依此循环,用户体验必然不好。
为了解决这个问题,那么长连网关必然要具备稳定性、通用性、轻量级,不能过于频繁的进行上线、重启。当然,如果在半夜这种时间上线,以上问题倒是没有那么明显。
长连网关包含有以下:
- 编解码
- 通信协议
- 心跳保活
- 用户在线状态
- 上下行消息处理
1. 编解码
对于TCP
协议通信来说,一般由于二进制包
过大或者过小,以及MTU
、sendBuffer
、receiveBuffer
等参数可能产生粘包、拆包问题
,从而破坏通信消息内容。
如果采用Netty
开发的话,这个问题就比较容易解决了,可以采用LengthFieldBasedFrameDecoder
、LineBasedFrameDecoder
、LengthFieldPrepender
,结合自定义协议的编解码方式一同组合使用。
2. 通信协议
为了让长连网关具备有通用性,协议也必须是轻量级的。
Google的ProtoBuf
是一个不错的参考协议实现。
在这里将协议分为两个部分:Header
和 Body
。
1 |
|
将请求Header
和Body
的 Key-Value对
根据 Proto
文件映射为 KeyIndex-Value
形式,对于 Value
也可以根据Proto
文件映射为 KeyIndex-Value
形式。
如:
1 | { |
转化为:
1 | { |
通过这种方式,可以有效的减少数据包的大小。
在长连网关层不需要对协议的字段进行解析,只需要识别出请求的类型,然后直接透传给API层即可。
3. 心跳保活
对于长连接来说,如果长时间没有消息的读写,这个连接可能会被服务器给断开,导致不可用而使客户端长时间无法收发消息。
双方通过间歇性的发送特殊的读写请求包来判断对方是否存活,很好的保证了连接的可用性。
如果采样Netty
开发的话,只需要将 IdleStateHandler
加入到pipline
中即可,并自定义一个Handler
用来处理IdleStateEvent
事件。
如:
1 |
|
4. 用户在线状态
在消息下推的时候,需要通过客户端与长连服务器建立的长连接通道下推下去,也因此要求了服务端需要维护着 用户 - 连接
的关系对。对于支持多端在线
的场景,如果恰好同一个用户的多个设备与同一台长连服务器建连,则需要加入用户 - 设备 - 连接
的关系对。
实际情况是:
- 一个用户可以有多台设备
- 同一用户的多台设备可以与同一台长连服务器建连
- 同一用户的同一设备只能与同一台长连服务器建一个连接
1 | /** |
//用户id到多台设备的映射关系
private ConcurrentHashMap<Long, Set<ChannelIdentifier>> userToDevices = new ConcurrentHashMap<>();
//在某一个设备上的用户与TCP连接的映射关系
private ConcurrentHashMap<ChannelIdentifier, Channel> deviceToChannel = new ConcurrentHashMap<>();
进行 deviceToChannel
的 put
操作的时候需要判断是否该设备已和同一台长连服务器建连,如果是的话,需要close
之前的连接,才可以建连。
5. 上下行消息处理
对于上行消息,客户端将请求包根据协议定义的格式,封装为HeaderMap
、BodyMap
,在HeaderMap
中声明了该消息的类型,客户端的信息、用户信息等。
对于下行消息,也是类似上行消息的格式。
当用户上线的时候,会触发 拉取离线消息
的操作,从 Redis
中sub
出离线期间的消息的时候,采用异步多线程的方式来消费消息,虽然消费快了,但是可能会导致消息乱序
,因此,不得不对消息进行次序整形
,为了实现次序整形
,需要在触发取离线消息操作的时候对每一条消息添加一个单调递增seqId
,长连网关sub
到消息之后,根据seqId
对消息进行从小到大排序,也因此实现了消息有序。
当消息下推到客户端的时候,服务器其实也不确定消息到底推送成功没有以及消息推送到哪条了,甚至是中间某条消息没有推送成功。除了需要存储用户当前已同步到的版本号
,对于每一个连接
,在建立连接的时候,初始化一个tid
,从0
开始,消息收到之后,需要进行ack
操作,类似TCP三次握手
过程,如果有消息丢失、超时
,那么将会触发重推
操作,在直播互动这种场景中,可能消息下推QPS特别高,且不一定需要全部消息都关注下推成功,但为了统计一下服务推送到达能力,可以采用采样ack
的方式,在消息的HeaderMap
中添加一个isSampled = true
,客户端及长连网关判断该消息需要采样ack
,则执行ack
操作,否则不执行。