0%

长连网关通过调用RPC接口,将上行消息直接透传给API处理层。

在API处理层,首先会根据 Header 中声明的接口信息,通过反射调用相应的Controller中的 method,长连网关层就类似 SpringMVCDispatcherServlet,而API层的处理就类似 @RequestMapping所做的事情。

首先,定义注解,用来标识一个方法。

1
2
3
4
5
6
7
8
9
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DispatcherURL {
//URL地址
String value();

//接口标识
String tag();
}

再定义一个对象,用来表示一个url - tag - bean - method的关系。

1
2
3
4
5
6
7
@Data
public class DispatcherMapping {
private String url;
private String tag;
private Object bean;
private String method;
}

基于SpringBoot来实现,容器初始化完成将会初始化映射关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.google.common.collect.Maps;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.util.Arrays;
import java.util.Map;

/**
* 用于初始化映射关系
*/
@Component
public class Initializer implements ApplicationListener<ContextRefreshedEvent> {
public static Map<String, DispatcherMapping> urlDispatcherMappingMap = Maps.newHashMap();

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context != null) {
String[] beanNames = context.getBeanDefinitionNames();
Arrays.stream(beanNames).forEach(beanName -> {
Object bean = context.getBean(beanName);
ReflectionUtils.doWithMethods(bean.getClass(), method -> {
if (method.isAnnotationPresent(DispatcherURL.class)) {
DispatcherMapping dispatcherMapping = new DispatcherMapping();
DispatcherURL dispatcherURL = method.getAnnotation(DispatcherURL.class);
dispatcherMapping.setUrl(dispatcherURL.value());
dispatcherMapping.setTag(dispatcherURL.tag());
dispatcherMapping.setBean(bean);
dispatcherMapping.setMethod(method.getName());
urlDispatcherMappingMap.put(dispatcherMapping.getTag(), dispatcherMapping);
}
});
});
}
}
}

声明一个方法大概如下:

1
2
3
4
5
6
7
8
@Controller
public class TestController {
@DispatcherURL(value = "/say", tag = "(1,0)")
public String say(Map<Integer, Object> header, Map<Integer, Object> body) {
//handler header & body
return "Hello world";
}
}

RPC服务端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.Map;

/**
* rpc服务端
*/
@Service
public class RpcService {
//长连网关调用
public Object invoke(Map<Integer, Object> headerMap, Map<Integer, Object> bodyMap) {
Object result = null;
try {
String tag = getTagFromHeader(headerMap);
DispatcherMapping dispatcherMapping = Initializer.urlDispatcherMappingMap.get(tag);
if (dispatcherMapping == null) {
//ignore...
return null;
}
Class bean = dispatcherMapping.getBean().getClass();
Method method = bean.getDeclaredMethod(dispatcherMapping.getMethod(), new Class[]{Map.class, Map.class});
result = method.invoke(bean, headerMap, bodyMap);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}

private String getTagFromHeader(Map<Integer, Object> headerMap) {
//假设tag的key为1和2
return "(" + headerMap.get(1) + "," + headerMap.get(2) + ")";
}
}

最终,客户端只需要在headerMap中传入tag,在bodyMap中传入业务参数,通过简单的RPC透传调用,即可实现网关通用性,一旦有新的接口上线,只需要修改API处理层即可。

长连网关作为与客户端的直接交互通道,其稳定性要求不言而喻。

作为一个网关,如果频繁的上版本,那么可能在上线期间导致大批量的用户瞬间掉线,而需要转移到其它长连服务器上建连。正常来说,机器一台一台甚至一组一组上,一台机器上用户连接因上线原因被清退,转而跑到另外一台长连服务器建连,而此时这台新的长连服务器也刚好进入上线队列中,再次将这台长连服务器上的用户清退,依此循环,用户体验必然不好。

为了解决这个问题,那么长连网关必然要具备稳定性、通用性、轻量级,不能过于频繁的进行上线、重启。当然,如果在半夜这种时间上线,以上问题倒是没有那么明显。

长连网关包含有以下:

  • 编解码
  • 通信协议
  • 心跳保活
  • 用户在线状态
  • 上下行消息处理
1. 编解码

对于TCP协议通信来说,一般由于二进制包过大或者过小,以及MTUsendBufferreceiveBuffer等参数可能产生粘包、拆包问题,从而破坏通信消息内容。

如果采用Netty开发的话,这个问题就比较容易解决了,可以采用LengthFieldBasedFrameDecoderLineBasedFrameDecoderLengthFieldPrepender,结合自定义协议的编解码方式一同组合使用。

2. 通信协议

为了让长连网关具备有通用性,协议也必须是轻量级的。

Google的ProtoBuf是一个不错的参考协议实现。

在这里将协议分为两个部分:HeaderBody

1
2
3
4
5
6
 @Data
public class ProtocolRequest implements Serializable {
private static final long serialVersionUID = 1L;
private Map<Integer, Object> header;
private Map<Integer, Object> body;
}

将请求HeaderBodyKey-Value对 根据 Proto 文件映射为 KeyIndex-Value 形式,对于 Value 也可以根据Proto 文件映射为 KeyIndex-Value 形式。

如:

1
2
3
4
5
{
"fromUserId":123,
"toUserId":456,
"content":"hello"
}

转化为:

1
2
3
4
5
{
0:123,
1:456,
2:"hello"
}

通过这种方式,可以有效的减少数据包的大小。

在长连网关层不需要对协议的字段进行解析,只需要识别出请求的类型,然后直接透传给API层即可。

3. 心跳保活

对于长连接来说,如果长时间没有消息的读写,这个连接可能会被服务器给断开,导致不可用而使客户端长时间无法收发消息。

双方通过间歇性的发送特殊的读写请求包来判断对方是否存活,很好的保证了连接的可用性。

如果采样Netty开发的话,只需要将 IdleStateHandler 加入到pipline中即可,并自定义一个Handler用来处理IdleStateEvent事件。

如:

1
2
3
4
5
6
7
8
9
10
11
12
 @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.close();
}
if (event.state() == IdleState.WRITER_IDLE) {
ctx.close();
}
}
}
4. 用户在线状态

在消息下推的时候,需要通过客户端与长连服务器建立的长连接通道下推下去,也因此要求了服务端需要维护着 用户 - 连接 的关系对。对于支持多端在线的场景,如果恰好同一个用户的多个设备与同一台长连服务器建连,则需要加入用户 - 设备 - 连接的关系对。

实际情况是:

  • 一个用户可以有多台设备
  • 同一用户的多台设备可以与同一台长连服务器建连
  • 同一用户的同一设备只能与同一台长连服务器建一个连接
1
2
3
4
5
6
7
8
9
10
/**
* 标识在某一个设备上的用户
*/
@Data
public class ChannelIdentifier {
//用户id
private Long userId;
//设备信息
private String device;
}
//用户id到多台设备的映射关系
private ConcurrentHashMap<Long, Set<ChannelIdentifier>> userToDevices = new ConcurrentHashMap<>();
//在某一个设备上的用户与TCP连接的映射关系
private ConcurrentHashMap<ChannelIdentifier, Channel> deviceToChannel = new ConcurrentHashMap<>();

进行 deviceToChannelput 操作的时候需要判断是否该设备已和同一台长连服务器建连,如果是的话,需要close之前的连接,才可以建连。

5. 上下行消息处理

对于上行消息,客户端将请求包根据协议定义的格式,封装为HeaderMapBodyMap,在HeaderMap中声明了该消息的类型,客户端的信息、用户信息等。

对于下行消息,也是类似上行消息的格式。

当用户上线的时候,会触发 拉取离线消息 的操作,从 Redissub出离线期间的消息的时候,采用异步多线程的方式来消费消息,虽然消费快了,但是可能会导致消息乱序,因此,不得不对消息进行次序整形,为了实现次序整形,需要在触发取离线消息操作的时候对每一条消息添加一个单调递增seqId,长连网关sub到消息之后,根据seqId对消息进行从小到大排序,也因此实现了消息有序。

当消息下推到客户端的时候,服务器其实也不确定消息到底推送成功没有以及消息推送到哪条了,甚至是中间某条消息没有推送成功。除了需要存储用户当前已同步到的版本号,对于每一个连接,在建立连接的时候,初始化一个tid,从0开始,消息收到之后,需要进行ack操作,类似TCP三次握手过程,如果有消息丢失、超时,那么将会触发重推操作,在直播互动这种场景中,可能消息下推QPS特别高,且不一定需要全部消息都关注下推成功,但为了统计一下服务推送到达能力,可以采用采样ack的方式,在消息的HeaderMap中添加一个isSampled = true,客户端及长连网关判断该消息需要采样ack,则执行ack操作,否则不执行。

用户上线的时候,需要和长连服务器建连,本文将涉及服务器地址下发设计。

IM

由于无法得知SLB的可用度,需要计算求和每一台ECS的可用度,间接得出。

在每一台长连网关程序上都启动了一个定时任务,每隔 5s 获取调度一次,计算ppsbytes,然后将二者分别除去设置的阈值,得到一个已用度,再用 1 - 已用度 得到 可用度的值,再在二者之中取最小值,作为当前ECS服务器的最终可用度,然后将这个值存到Redis中。(PPS及Bytes计算方法

下发IP服务程序中,会去读取Redis中存储的每一个SLB对应的ECS服务器列表,每次统计可用度的时候,读出所有ECS服务器当前的可用度,将可用度求和,作为当前SLB的可用度。将计算得出的SLB可用度缓存到机器内存(调用Guava的LoadingCache实现)及Redis中。

为了让可用度比较接近的SLB被均衡连接,将可用度乘以一个随机数再去进行排序,让同一时刻内返回的SLB列表不会过于固定,造成某一个SLB过于饱和。