0%

IM - 长连网关

长连网关作为与客户端的直接交互通道,其稳定性要求不言而喻。

作为一个网关,如果频繁的上版本,那么可能在上线期间导致大批量的用户瞬间掉线,而需要转移到其它长连服务器上建连。正常来说,机器一台一台甚至一组一组上,一台机器上用户连接因上线原因被清退,转而跑到另外一台长连服务器建连,而此时这台新的长连服务器也刚好进入上线队列中,再次将这台长连服务器上的用户清退,依此循环,用户体验必然不好。

为了解决这个问题,那么长连网关必然要具备稳定性、通用性、轻量级,不能过于频繁的进行上线、重启。当然,如果在半夜这种时间上线,以上问题倒是没有那么明显。

长连网关包含有以下:

  • 编解码
  • 通信协议
  • 心跳保活
  • 用户在线状态
  • 上下行消息处理
1. 编解码

对于TCP协议通信来说,一般由于二进制包过大或者过小,以及MTUsendBufferreceiveBuffer等参数可能产生粘包、拆包问题,从而破坏通信消息内容。

如果采用Netty开发的话,这个问题就比较容易解决了,可以采用LengthFieldBasedFrameDecoderLineBasedFrameDecoderLengthFieldPrepender,结合自定义协议的编解码方式一同组合使用。

2. 通信协议

为了让长连网关具备有通用性,协议也必须是轻量级的。

Google的ProtoBuf是一个不错的参考协议实现。

在这里将协议分为两个部分:HeaderBody

1
2
3
4
5
6
 @Data
public class ProtocolRequest implements Serializable {
private static final long serialVersionUID = 1L;
private Map<Integer, Object> header;
private Map<Integer, Object> body;
}

将请求HeaderBodyKey-Value对 根据 Proto 文件映射为 KeyIndex-Value 形式,对于 Value 也可以根据Proto 文件映射为 KeyIndex-Value 形式。

如:

1
2
3
4
5
{
"fromUserId":123,
"toUserId":456,
"content":"hello"
}

转化为:

1
2
3
4
5
{
0:123,
1:456,
2:"hello"
}

通过这种方式,可以有效的减少数据包的大小。

在长连网关层不需要对协议的字段进行解析,只需要识别出请求的类型,然后直接透传给API层即可。

3. 心跳保活

对于长连接来说,如果长时间没有消息的读写,这个连接可能会被服务器给断开,导致不可用而使客户端长时间无法收发消息。

双方通过间歇性的发送特殊的读写请求包来判断对方是否存活,很好的保证了连接的可用性。

如果采样Netty开发的话,只需要将 IdleStateHandler 加入到pipline中即可,并自定义一个Handler用来处理IdleStateEvent事件。

如:

1
2
3
4
5
6
7
8
9
10
11
12
 @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.close();
}
if (event.state() == IdleState.WRITER_IDLE) {
ctx.close();
}
}
}
4. 用户在线状态

在消息下推的时候,需要通过客户端与长连服务器建立的长连接通道下推下去,也因此要求了服务端需要维护着 用户 - 连接 的关系对。对于支持多端在线的场景,如果恰好同一个用户的多个设备与同一台长连服务器建连,则需要加入用户 - 设备 - 连接的关系对。

实际情况是:

  • 一个用户可以有多台设备
  • 同一用户的多台设备可以与同一台长连服务器建连
  • 同一用户的同一设备只能与同一台长连服务器建一个连接
1
2
3
4
5
6
7
8
9
10
/**
* 标识在某一个设备上的用户
*/
@Data
public class ChannelIdentifier {
//用户id
private Long userId;
//设备信息
private String device;
}
//用户id到多台设备的映射关系
private ConcurrentHashMap<Long, Set<ChannelIdentifier>> userToDevices = new ConcurrentHashMap<>();
//在某一个设备上的用户与TCP连接的映射关系
private ConcurrentHashMap<ChannelIdentifier, Channel> deviceToChannel = new ConcurrentHashMap<>();

进行 deviceToChannelput 操作的时候需要判断是否该设备已和同一台长连服务器建连,如果是的话,需要close之前的连接,才可以建连。

5. 上下行消息处理

对于上行消息,客户端将请求包根据协议定义的格式,封装为HeaderMapBodyMap,在HeaderMap中声明了该消息的类型,客户端的信息、用户信息等。

对于下行消息,也是类似上行消息的格式。

当用户上线的时候,会触发 拉取离线消息 的操作,从 Redissub出离线期间的消息的时候,采用异步多线程的方式来消费消息,虽然消费快了,但是可能会导致消息乱序,因此,不得不对消息进行次序整形,为了实现次序整形,需要在触发取离线消息操作的时候对每一条消息添加一个单调递增seqId,长连网关sub到消息之后,根据seqId对消息进行从小到大排序,也因此实现了消息有序。

当消息下推到客户端的时候,服务器其实也不确定消息到底推送成功没有以及消息推送到哪条了,甚至是中间某条消息没有推送成功。除了需要存储用户当前已同步到的版本号,对于每一个连接,在建立连接的时候,初始化一个tid,从0开始,消息收到之后,需要进行ack操作,类似TCP三次握手过程,如果有消息丢失、超时,那么将会触发重推操作,在直播互动这种场景中,可能消息下推QPS特别高,且不一定需要全部消息都关注下推成功,但为了统计一下服务推送到达能力,可以采用采样ack的方式,在消息的HeaderMap中添加一个isSampled = true,客户端及长连网关判断该消息需要采样ack,则执行ack操作,否则不执行。