长连网关作为与客户端的直接交互通道,其稳定性要求不言而喻。
作为一个网关,如果频繁的上版本,那么可能在上线期间导致大批量的用户瞬间掉线,而需要转移到其它长连服务器上建连。正常来说,机器一台一台甚至一组一组上,一台机器上用户连接因上线原因被清退,转而跑到另外一台长连服务器建连,而此时这台新的长连服务器也刚好进入上线队列中,再次将这台长连服务器上的用户清退,依此循环,用户体验必然不好。
为了解决这个问题,那么长连网关必然要具备稳定性、通用性、轻量级,不能过于频繁的进行上线、重启。当然,如果在半夜这种时间上线,以上问题倒是没有那么明显。
长连网关包含有以下:
1. 编解码
对于TCP协议通信来说,一般由于二进制包过大或者过小,以及MTU、sendBuffer、receiveBuffer等参数可能产生粘包、拆包问题,从而破坏通信消息内容。
如果采用Netty开发的话,这个问题就比较容易解决了,可以采用LengthFieldBasedFrameDecoder、LineBasedFrameDecoder、LengthFieldPrepender,结合自定义协议的编解码方式一同组合使用。
2. 通信协议
为了让长连网关具备有通用性,协议也必须是轻量级的。
Google的ProtoBuf是一个不错的参考协议实现。
在这里将协议分为两个部分:Header 和 Body。
1 2 3 4 5 6
   |  @Data public class ProtocolRequest implements Serializable {     private static final long serialVersionUID = 1L;     private Map<Integer, Object> header;     private Map<Integer, Object> body; }
   | 
将请求Header和Body的 Key-Value对 根据 Proto 文件映射为 KeyIndex-Value 形式,对于 Value 也可以根据Proto 文件映射为 KeyIndex-Value 形式。
如:
1 2 3 4 5
   | { 	"fromUserId":123, 	"toUserId":456, 	"content":"hello" }
  | 
转化为:
1 2 3 4 5
   | { 	0:123, 	1:456, 	2:"hello" }
  | 
通过这种方式,可以有效的减少数据包的大小。
在长连网关层不需要对协议的字段进行解析,只需要识别出请求的类型,然后直接透传给API层即可。
3. 心跳保活
对于长连接来说,如果长时间没有消息的读写,这个连接可能会被服务器给断开,导致不可用而使客户端长时间无法收发消息。
双方通过间歇性的发送特殊的读写请求包来判断对方是否存活,很好的保证了连接的可用性。
如果采样Netty开发的话,只需要将 IdleStateHandler 加入到pipline中即可,并自定义一个Handler用来处理IdleStateEvent事件。
如:
1 2 3 4 5 6 7 8 9 10 11 12
   |  @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {     if (evt instanceof IdleStateEvent) {         IdleStateEvent event = (IdleStateEvent) evt;         if (event.state() == IdleState.READER_IDLE) {             ctx.close();         }         if (event.state() == IdleState.WRITER_IDLE) {             ctx.close();         }     } }
   | 
4. 用户在线状态
在消息下推的时候,需要通过客户端与长连服务器建立的长连接通道下推下去,也因此要求了服务端需要维护着 用户 - 连接 的关系对。对于支持多端在线的场景,如果恰好同一个用户的多个设备与同一台长连服务器建连,则需要加入用户 - 设备 - 连接的关系对。
实际情况是:
- 一个用户可以有多台设备
 - 同一用户的多台设备可以与同一台长连服务器建连
 - 同一用户的同一设备只能与同一台长连服务器建一个连接
 
1 2 3 4 5 6 7 8 9 10
   | 
 
  @Data public class ChannelIdentifier {          private Long userId;          private String device; }
 
  | 
private ConcurrentHashMap<Long, Set<ChannelIdentifier>> userToDevices = new ConcurrentHashMap<>();
private ConcurrentHashMap<ChannelIdentifier, Channel> deviceToChannel = new ConcurrentHashMap<>();
进行 deviceToChannel 的 put 操作的时候需要判断是否该设备已和同一台长连服务器建连,如果是的话,需要close之前的连接,才可以建连。
5. 上下行消息处理
对于上行消息,客户端将请求包根据协议定义的格式,封装为HeaderMap、BodyMap,在HeaderMap中声明了该消息的类型,客户端的信息、用户信息等。
对于下行消息,也是类似上行消息的格式。
当用户上线的时候,会触发 拉取离线消息 的操作,从 Redis 中sub出离线期间的消息的时候,采用异步多线程的方式来消费消息,虽然消费快了,但是可能会导致消息乱序,因此,不得不对消息进行次序整形,为了实现次序整形,需要在触发取离线消息操作的时候对每一条消息添加一个单调递增seqId,长连网关sub到消息之后,根据seqId对消息进行从小到大排序,也因此实现了消息有序。
当消息下推到客户端的时候,服务器其实也不确定消息到底推送成功没有以及消息推送到哪条了,甚至是中间某条消息没有推送成功。除了需要存储用户当前已同步到的版本号,对于每一个连接,在建立连接的时候,初始化一个tid,从0开始,消息收到之后,需要进行ack操作,类似TCP三次握手过程,如果有消息丢失、超时,那么将会触发重推操作,在直播互动这种场景中,可能消息下推QPS特别高,且不一定需要全部消息都关注下推成功,但为了统计一下服务推送到达能力,可以采用采样ack的方式,在消息的HeaderMap中添加一个isSampled = true,客户端及长连网关判断该消息需要采样ack,则执行ack操作,否则不执行。