0%

简单的说,数据链路层定义了电信号的分组方式。

数据链路层,连接了上层的 网络层 和下层的 物理层

数据链路层是对物理层功能的增强,将物理层连接转为逻辑上无差错的链路。

为上层网络层提供透明传输和可靠的数据传输服务。

数据链路层

单纯的电信号0和1是没有任何意义的,只有规定多少个为一组,每一组都是什么意思才有意义。
而这些分组要有意义,是由 通信协议 来控制的。

把实现这些 通信协议 的硬件和软件加到链路上,就形成了数据链路。
这样的硬件比如:网卡、交换机。

数据链路层需要解决三个问题:封装成帧、透明传输、差错校验。

主要关注的是:

  1. 怎么封装成帧
  2. 怎么去传输帧
  3. 帧传输目的地
  4. 校验帧完整性
在一段数据的前后增加首部、尾部,这就形成了一个帧。

数据链路帧

帧,包含:帧头、数据部分、帧尾。

不同的数据链路层通信协议会对应着不同的帧。

帧头

由8位二进制数组成,代表的是网络层数据包的开始。

从应用层开始往下,每一层都会接收上层数据,并加入自己的头。

数据部分

大小至少46字节,最大1500字节,也就是最大传输单元MTU。

如果数据过大,将会产生分片。

帧尾

由8位二进制数组成,代表的是网络层数据包的结束,也包含了一些控制信息。

通过帧的首部和尾部包含的控制信息,来给帧定义一个边界。

帧,采用的是透明传输。

不论上层传输过来的内容是什么,数据链路层只会将上层的数据部分的内容往下传输,起到一个通道的作用,并且保证传输质量,不参与任何业务。

为了表示一个帧结束了,在帧尾添加了 特殊转义字符ESC。当然,由于对上层数据采用透传的策略,可能因为数据中就包含了ESC,导致帧提前结束,因此遇到 多个ESC 的时候,将会删除其它,采用最后一个作为结束。

由于数据帧传输过程中可能发生错误,则需要差错校验。

因为在网络中传输的过程中,可能会有噪声等多方面因素,导致原本数据包中的bit产生比特差错,由1变成0,由0变成1,就需要进行差错校验。

差错校验

数据链路层中采用的方式是:CRC循环冗余校验法

CRC校验,通过在数据为K位的数据后添加n位的冗余码组成帧。

CRC校验

发送端和接收端,共同协商出一个多项式,作为除数,将要发送的数据设定为固定大小的位数,作为被除数,通过反复的异或取余,最终得到一个冗余码,通过比较二者冗余码 FCS是否一致来确定帧的完整性
多项式

多项式由 x^n 是否存在,由高到低组成,存在为1,否则为0。

冗余码

异或:1 ^ 1 = 01 ^ 0 = 10 ^ 0 = 0

数据链路层提供了三种基本服务。

根据链路的可靠性以及效率来划分。

  1. 无确认无连接服务
  2. 有确认无连接服务
  3. 有确认有连接服务

相比来说,无连接的比起有连接的效率更高,有确认的比无确认的可靠性更高。

通过差错校验之后,假设数据帧已经没有问题了,但如果由于网络问题,导致丢包了,那么这条链路也就算是不可靠的了。

无确认无连接服务,只管往哪个目的地传送数据帧即可,哪怕丢失了也不做处理。

有确认无连接服务,在无确认无连接服务的基础上,要求接收方在收到数据帧之后做确认处理,如果超时后没有收到则会重发。

有确认有连接服务,在有确认无连接服务的基础上,将接收方和发送方之间建立一个连接,并且给每一个发送的帧加上编号,并且需要接收方接收到帧后做确认处理,比起有确认无连接服务,可以防止一个帧被多次重复发送,接收方多次重复接收。

在不可靠的链路上,通过各种协议,来最终实现可靠传输。

CRC只能保证传输的帧的数据是完整的,无法保证传输的帧一定被接收。

为了实现可靠传输,产生了 停止等待协议退回N步协议选择重传协议

停止等待协议

发送方一次只能发送一帧,发送的同时启动计时器,然后等待接收方的确认信息。

如果时间超过了两倍来回时间,则重新发送当前帧,并重新启动计时器。

如果确认分组丢失了,或者接收到的帧的编号不是接收方期望接收到的编号或者超时了,则会要求发送方重新发送要求发送的帧。

退回N步协议

由于停止等待协议这种方式效率较低,为了改进以便提高信道的传输效率,接收方允许发送方一次发送多个帧,将帧存入FIFO buffer中,然后再逐步ack,从中删除正确的帧。但是,如果中间某一帧丢失了,那么就需要从丢失的帧号开始全部重传

arq

另外,由于发送方和接收方的处理能力不一样,需要基于 滑动窗口 来控制发送的速率和帧数。

选择重传协议

由于退回N步协议,每次都需要从丢失的帧号开始 全部重传,对于已经被ack的分组也会被重传,这种方式也是不利于提高信道的传输效率的,那么就通过选择重传协议,只将发送错误的分组重传即可。
arq

数据链路层的信道分为:点对点信道、广播信道。

dll-road

点对点信道采用的主要是 PPP协议,一种点对点的协议。

家庭中的宽带拨号的时候,通常会是PPPoE

ppp

PPP的链路包含6个步骤。

用户通过宽带PPPoE拨号,与ISP建立连接,由下层物理层来负责链路的建立,当链路建立成功后,双方发送LCP包来确认链路的一些信息(最大帧长、鉴权协议)以及是否可以在当前链路传输,之后再进行鉴权(身份识别、是否欠费)操作,成功的话,则开始进行网络层控制协议(NCP)配置的协商(分配IP、网络层协议),协商成功后则正式打开链路,进行数据的传输,数据传输完成后就进入了链路终止状态。

ppp

广播信道主要采用的是以太网通信标准,常用于局域网。

所谓的一台计算机发送的信号会被局域网内的所有计算机都收到。

broadcast

以太网协议,早期各企业自定分组方式,后形成的标准。

上文提到了,数据链路层通过通信协议为电信号提供分组方式。

Ethernet 规定,一组电信号组成一个数据包,称为

每一个数据帧由头 head 和数据 data 组成。

  • head 固定18个字节,包含:发送者/源地址接收者/目标地址数据类型,各占6字节。

  • data 最短46字节,最长1500字节,是数据包的具体内容,超过MTU的话则需要分片传输。

以太网就是局域网,局域网不一定是以太网。

在以太网中,发送者/源地址接收者/目标地址是由 mac地址来确定的,这个地址在网卡上标识着,全球唯一的。

因此,发送者、接收者的地址,说的就是 网卡的地址

mac地址由48位二进制组成,12位16进制组成,前6位是厂商编号,后6位是流水号。

mac地址

找了个网址查了一下,点击查询

mac地址

有了mac地址之后,同一个网络中的计算机就可以通过mac地址找到对方,并进行通信了。

理论上,全世界的计算机之间都可以通过mac地址和对方相互通信。

只需要在数据帧的 head 部分写入自己的mac地址和对方的mac地址,然后给同一网络中的所有计算机发送信息,对比接收者是否自身,以此方式完成通信。这种方式,也称之为 广播

通常,在通信的时候,只知道对方的IP地址,而不知道物理地址。

ARP协议就被设计来通过IP地址查询对方MAC地址。

一台计算机通过ARP协议可以获取另一台计算机的mac地址。

在TCP/IP模型中,ARP协议属于网络层协议。

在OSI模型中,ARP协议属于数据链路层协议。

arp帧

抓包信息如下:

arp

通过ARP协议,发送以太网帧,去询问同一网络上的所有计算机,IP是目标计算机的话,那就回复MAC地址,否则丢弃,默认请求数据下为 00:00:00:00:00:00。查询到目标IP对应的MAC地址之后就缓存到自己的内存中。

ARP分为两种类型:静态和动态。

通过 arp -a 命令可以查看到已缓存、配置的IP和MAC地址映射表,且可以知道动静类型。

ARP地址解析协议的工作流程如下:

arp

在网络上,两台计算机要通信,就需要一个通信媒介。

物理层,通过这么一个介质,使得两台计算机之间得以完成信号的传输。

如果全世界的计算机都连接到彼此互通的介质中,那么彼此之间都可以相互进行信号的传输了。

通信的根本目的就是为了传输数据,而这个数据可以是任何形式的。

在生活中,常见的信号传输介质有:光缆、电缆、双绞线、无线电波等。

物理层

简单的说,物理层包含的就是通信介质信号传输

通俗的说,物理层规定了通信数据应该以何种方式、何种形态、在何处进行传输。

物理层规定了通信介质的一些特性。
  • 机械特性
  • 电气特性
  • 规程特性

机械特性,规定了接口的大小、形状、颜色、排列方式,比如:RJ45水晶头。

电气特性,规定了在网线中传输的电压的范围,分为高(5V)低(-5V)电压。

规程特性,规定了建立连接时元器组件之间的工作方式及步骤。

信号的形态分为:模拟信号、数字信号。

信号的形态

信号数据,可以表示图片、语音、文字、视频等任何信息。

模拟信号,指的是在某一段连续时间内,信号的幅度、频率、相位随着时间的变化而连续变化。

数字信号,指的是幅度的取值被限制为0和1的离散型信号,跳跃着变化的,由高低电压来体现。

调制解调器,实现了模拟信号数字信号的转换。

在家庭中,调制解调器又称为,既可以连接电信电路,又可以连接网线。

电话信号一般是低频率的,而高频率的则用来作为宽带信号。

调制:将数字信号转为模拟信号的过程。

解调:将模拟信号转为数字信号的过程。

调制解调器

信号又分为:基带信号、带通信号。
  • 基带信号(类比货物)

    从信号源头产生的没有经过加工的信号。

    在手机中,基带是手机的通信模块,调制解调器。

    手机通话、上网质量差跟基带也是有莫大的关系的。

  • 带通信号(类比货车)

    基带信号经过载波调制后,被放大频率,以便在信道中传输。

    载波是一个特定频率的无线电波,单位是Hz,是一种在频率、调幅、相位方面被调制成用来传输语音、文本等数据的电磁波。由振荡器产生的并在信道上传输的无线电波,频率会比基带信号高,属于高频信号。

基带信号

信号的传输方式:

  • 串行、并行
  • 单工、半双工、全双工
  • 位同步、字符同步

串行,将一个字符的二进制码1个个的从低位向高位在信道中依次传输。

并行,将一个字符的二进制码在8个并行信道中同时传输。

transtype

单工,信号只能单向传输。

半双工,信号支持双向传输,但是同一时刻只允许往一个方向。

全双工,信号支持双向传输,并且同一时刻允许往两个方向。

work

位同步,使接收端的每一位都跟发送端的保持同步状态,也是数字信号码元时间对齐的过程。码元指的是数字信号0、1,1个码元可以携带多个bit的数据量。

位同步包含:外同步、内同步。

  • 外同步

发送端发送数据时,同时发送时钟信号,接收方用同步信号来锁定自己的时钟脉冲频率

时钟脉冲指的是脉冲信号是一个按一定电压幅度,一定时间间隔连续发出的脉冲信号。脉冲信号之间的时间间隔称为周期。一个周期发送的脉冲信号的个数称为频率频率的计量单位是赫兹Hz

通俗的说,就是二者要在一个频道上。

电信频谱

  • 内同步

发送端通过特殊的编码方式进行编码,如:曼彻斯特编码,这些编码信号中包含了同步信号,接收端从这些信号中提取出时钟脉冲频率

字符同步,由于位同步只能以二进制码元的方式传输,为了识别出每一个字符的边界,还需要通过字符同步的方式来约束每一个字符应该到哪结束,如:ASCII编码,每一个字符都是8位,则应该以8位8位的方式作为一个字符的结束点来同步。

信号转换

信道,表示传输信息的媒体。

信道分为:有线信道、无线信道。

  • 有线信道(导向传输媒体)

    常见的有线信道,包含有:明线、对称电缆、同轴电缆、光纤。

    明线,架在电线杆上的那种。

    对称电缆,比如双绞线。

    同轴电缆,那种同心圆柱体状的电缆。

    光纤,利用光反射的原理使其在纤导中传播。

  • 无线信道(非导向传输媒体)

    信号基于电磁波传输,假想的一个无形的通道,比如无线电。

    根据频段的不同来区分不同的信道。

    在家庭路由器当中,通常会有13个信道,每个信道频率不同。

数据通信系统

数据通信系统

一台计算机,本质上就是一堆零件,只有安装上了软件,才能被正常使用。

计算机由硬件和软件组成。

硬件包括:CPU、内存、磁盘、主板、网卡、声卡、显卡等。

软件包括:操作系统软件、应用程序软件。

如果没有软件,计算机就是一堆破铜烂铁,而如何来管理这些硬件,就需要操作系统软件来帮助完成这件事情了。

computer.png

操作系统

操作系统是管理和控制计算机硬件和软件资源的计算机软件,是最基本的系统软件。

所有的应用程序软件都需要在操作系统的支持下才能使用。

操作系统为软件程序的运行提供了调度、分配硬件资源,协调多个程序之间的运行,也为用户提供了一个可视化的界面。

网络通信

一台计算机可以使用了之后,多台计算机之间如果不相互通信,那么计算机就比较孤独了,人与人之间需要通信,计算机之间也是需要通信的。

计算机之间通过互联网来完成彼此之间的通信,而计算机之间总得需要有一样的方言才能读懂对方计算机在说什么吧,而计算机之间的方言被称之为互联网协议

互联网协议定义了网络之间如何相互连接、如何进行通信的种种标准。

Internet.png

互联网协议

有个称为 ISO 国际标准化组织,定义了一个 OSI模型,定义了不同计算机之间的通信标准。这个模型将网络通信分为七层:物理层、数据链路层、网络层、传输层、会话层、表示层、应用层。

protocol.png

当消息下推到客户端的时候,服务端对于推送状态是无感知的。

在用户建立连接的时候,在Channel中缓存并初始化tid0,每一条消息下推的时候,都会设置一个自增tid,并将等待Ack的消息存储在队列中,一旦某个tidAck了就将该消息移出队列。

也有一些情况,比如超过设置的期望时间,还没有Ack,则可以认为该消息丢失了,则启动重新下推的操作。

ACK的数据结构可以定义为:

1
2
3
4
5
6
7
8
9
10
11
@Data
public class Ack {
private long tid;
private String msgId;
}

@Data
public class WaitAckMessage {
private Message message;
private long tid;
}

定义一个AckBuffer来存储ack的信息

1
2
3
4
5
6
7
8
9
10
11
12
public class AckBuffer {
private static Set<Long> set = Sets.newHashSet();

public void acknowledge(Ack ack) {
//可以使用redis的zset来实现
set.remove(ack.getTid());
}

public void addAck(Ack ack) {
set.add(ack.getTid());
}
}

由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。

假设存在离线消息队列 Redis,以及 10 条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10完整pubRedis中,但可能因为网络延迟导致顺序变成1,3,2,6,5,4,7,8,9,10,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10,但是从Redissub出来的时候变成了1,2,3,5,6,4,8,9,7,10,也因此导致了离线消息乱序的产生。

为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId,也是为了区分消息是否是新写入离线消息队列中的消息用的。

定义Message类表示要下推前未处理的消息

1
2
3
4
5
6
7
8
9
@Data
public class Message {
private String msgId;
private String deviceId;
private Long userId;
private int packageId;
private int seqId;
//....more
}

定义ChannelUtil类表示Channel工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ChannelUtil {
private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");

public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) {
channel.attr(packageKey).set(packageQueueMap);
}

public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return channel.attr(packageKey).get();
}

public static Channel getChannelByUserInfo(String deviceId, Long userId) {
//ignore...
return null;
}
}

定义SequenceMessage类表示排序消息

1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SequenceMessage {
private int seqId;
private String msgId;
}

定义PackageQueue表示包队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 包队列
*/

@Data
public class PackageQueue {
//根据seqId排序
private PriorityQueue<SequenceMessage> q =
new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));

private Channel channel;
private int packageId;

public PackageQueue() {

}

public PackageQueue(Channel channel, int packageId) {
this.channel = channel;
this.packageId = packageId;
}

public void addMsg(SequenceMessage msg) {
q.add(msg);
}

public void drainOut() {
//...
while (!q.isEmpty()) {
//需将推送包封装为HeaderMap、BodyMap形式,该demo忽略
channel.writeAndFlush(q.poll());
}
}
}

定义PackageQueueManager表示包队列管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 包队列管理器
*/
public class PackageQueueManager {
//定时
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return ChannelUtil.getPackageQueue(channel);
}

//sub from redis and invoke this method
public void addMsg(Message message) {
Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId());
if (channel != null && channel.isActive()) {
ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel);
packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> {
PackageQueue packageQueue = new PackageQueue(channel, pid);
//2秒下发一次
executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS);
return packageQueue;
}).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId()));
}
}
}

定义Publisher表示离线消息的发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Publisher {

public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) {
List<Message> messageList = loadOfflineMsgs(syncVersion);
int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
int seqId = 0;
for (Message message : messageList) {
message.setDeviceId(deviceId);
message.setUserId(userId);
message.setSeqId(seqId++);
message.setPackageId(packageId);
}

//pub to redis, the channel is : pubChannel
}

private List<Message> loadOfflineMsgs(long startVersion) {
//from redis
return null;
}
}

队列机层起到了聚合多方消息,减轻网关层压力的中间枢纽作用。

作为一个系统,可能是独享的,可能开放出一些能力提供给其它业务方使用,为了方便、安全、快捷,可通过队列机层来集中处理这些消息。

比如:发送私信,客户端可以直接通过上行接口直接调用,可以通过写入消息队列中,可以通过短信发送,而队列机无需关心消息到底怎么来,只需要关注消息可以这么来,而消息的来源渠道可以由各业务方来实现,以及消息如何处理,将要发送到什么地方,是否需要写入离线缓存队列中。

另外,哪怕多来几个不同APP,要接收下推消息,并且不想接入我方实现的网关层来进行消息下推,可以自行 sub 队列机层的 Redis,然后由自己来实现网关层

在整个链路中,API层相当于长连网关层的服务端,也相当于服务层的客户端。

服务层包含有:私信、群聊、群发等。

API层通过HTTP或者RPC的方式,去调用服务层的各种服务,然后返回处理 成功 或者 失败报文给 API层

服务层 处理成功之后,将消息存入MySQLHBase 等存储中,每一条消息都会有个版本号,基于用户维度的,然后将消息写入 RedisMemcachedQKafka 等消息中间件中,由队列机层 来订阅这些消息中间件,再经过一系列的处理后,将最终消息PubRedis中,长连网关层 的每一条服务器都可以Sub到这些消息,可以选择在网关层中对消息进行GZIP压缩,然后最终推送给客户端。

长连网关通过调用RPC接口,将上行消息直接透传给API处理层。

在API处理层,首先会根据 Header 中声明的接口信息,通过反射调用相应的Controller中的 method,长连网关层就类似 SpringMVCDispatcherServlet,而API层的处理就类似 @RequestMapping所做的事情。

首先,定义注解,用来标识一个方法。

1
2
3
4
5
6
7
8
9
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DispatcherURL {
//URL地址
String value();

//接口标识
String tag();
}

再定义一个对象,用来表示一个url - tag - bean - method的关系。

1
2
3
4
5
6
7
@Data
public class DispatcherMapping {
private String url;
private String tag;
private Object bean;
private String method;
}

基于SpringBoot来实现,容器初始化完成将会初始化映射关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.google.common.collect.Maps;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.util.Arrays;
import java.util.Map;

/**
* 用于初始化映射关系
*/
@Component
public class Initializer implements ApplicationListener<ContextRefreshedEvent> {
public static Map<String, DispatcherMapping> urlDispatcherMappingMap = Maps.newHashMap();

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context != null) {
String[] beanNames = context.getBeanDefinitionNames();
Arrays.stream(beanNames).forEach(beanName -> {
Object bean = context.getBean(beanName);
ReflectionUtils.doWithMethods(bean.getClass(), method -> {
if (method.isAnnotationPresent(DispatcherURL.class)) {
DispatcherMapping dispatcherMapping = new DispatcherMapping();
DispatcherURL dispatcherURL = method.getAnnotation(DispatcherURL.class);
dispatcherMapping.setUrl(dispatcherURL.value());
dispatcherMapping.setTag(dispatcherURL.tag());
dispatcherMapping.setBean(bean);
dispatcherMapping.setMethod(method.getName());
urlDispatcherMappingMap.put(dispatcherMapping.getTag(), dispatcherMapping);
}
});
});
}
}
}

声明一个方法大概如下:

1
2
3
4
5
6
7
8
@Controller
public class TestController {
@DispatcherURL(value = "/say", tag = "(1,0)")
public String say(Map<Integer, Object> header, Map<Integer, Object> body) {
//handler header & body
return "Hello world";
}
}

RPC服务端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.Map;

/**
* rpc服务端
*/
@Service
public class RpcService {
//长连网关调用
public Object invoke(Map<Integer, Object> headerMap, Map<Integer, Object> bodyMap) {
Object result = null;
try {
String tag = getTagFromHeader(headerMap);
DispatcherMapping dispatcherMapping = Initializer.urlDispatcherMappingMap.get(tag);
if (dispatcherMapping == null) {
//ignore...
return null;
}
Class bean = dispatcherMapping.getBean().getClass();
Method method = bean.getDeclaredMethod(dispatcherMapping.getMethod(), new Class[]{Map.class, Map.class});
result = method.invoke(bean, headerMap, bodyMap);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}

private String getTagFromHeader(Map<Integer, Object> headerMap) {
//假设tag的key为1和2
return "(" + headerMap.get(1) + "," + headerMap.get(2) + ")";
}
}

最终,客户端只需要在headerMap中传入tag,在bodyMap中传入业务参数,通过简单的RPC透传调用,即可实现网关通用性,一旦有新的接口上线,只需要修改API处理层即可。

长连网关作为与客户端的直接交互通道,其稳定性要求不言而喻。

作为一个网关,如果频繁的上版本,那么可能在上线期间导致大批量的用户瞬间掉线,而需要转移到其它长连服务器上建连。正常来说,机器一台一台甚至一组一组上,一台机器上用户连接因上线原因被清退,转而跑到另外一台长连服务器建连,而此时这台新的长连服务器也刚好进入上线队列中,再次将这台长连服务器上的用户清退,依此循环,用户体验必然不好。

为了解决这个问题,那么长连网关必然要具备稳定性、通用性、轻量级,不能过于频繁的进行上线、重启。当然,如果在半夜这种时间上线,以上问题倒是没有那么明显。

长连网关包含有以下:

  • 编解码
  • 通信协议
  • 心跳保活
  • 用户在线状态
  • 上下行消息处理
1. 编解码

对于TCP协议通信来说,一般由于二进制包过大或者过小,以及MTUsendBufferreceiveBuffer等参数可能产生粘包、拆包问题,从而破坏通信消息内容。

如果采样Netty开发的话,这个问题就比较容易解决了,可以采用LengthFieldBasedFrameDecoderLineBasedFrameDecoderLengthFieldPrepender,结合自定义协议的编解码方式一同组合使用。

2. 通信协议

为了让长连网关具备有通用性,协议也必须是轻量级的。

Google的ProtoBuf是一个不错的参考协议实现。

在这里将协议分为两个部分:HeaderBody

1
2
3
4
5
6
 @Data
public class ProtocolRequest implements Serializable {
private static final long serialVersionUID = 1L;
private Map<Integer, Object> header;
private Map<Integer, Object> body;
}

将请求HeaderBodyKey-Value对 根据 Proto 文件映射为 KeyIndex-Value 形式,对于 Value 也可以根据Proto 文件映射为 KeyIndex-Value 形式。

如:

1
2
3
4
5
{
"fromUserId":123,
"toUserId":456,
"content":"hello"
}

转化为:

1
2
3
4
5
{
0:123,
1:456,
2:"hello"
}

通过这种方式,可以有效的减少数据包的大小。

在长连网关层不需要对协议的字段进行解析,只需要识别出请求的类型,然后直接透传给API层即可。

3. 心跳保活

对于长连接来说,如果长时间没有消息的读写,这个连接可能会被服务器给断开,导致不可用而使客户端长时间无法收发消息。

双方通过间歇性的发送特殊的读写请求包来判断对方是否存活,很好的保证了连接的可用性。

如果采样Netty开发的话,只需要将 IdleStateHandler 加入到pipline中即可,并自定义一个Handler用来处理IdleStateEvent事件。

如:

1
2
3
4
5
6
7
8
9
10
11
12
 @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.close();
}
if (event.state() == IdleState.WRITER_IDLE) {
ctx.close();
}
}
}
4. 用户在线状态

在消息下推的时候,需要通过客户端与长连服务器建立的长连接通道下推下去,也因此要求了服务端需要维护着 用户 - 连接 的关系对。对于支持多端在线的场景,如果恰好同一个用户的多个设备与同一台长连服务器建连,则需要加入用户 - 设备 - 连接的关系对。

实际情况是:

  • 一个用户可以有多台设备
  • 同一用户的多台设备可以与同一台长连服务器建连
  • 同一用户的同一设备只能与同一台长连服务器建一个连接
1
2
3
4
5
6
7
8
9
10
/**
* 标识在某一个设备上的用户
*/
@Data
public class ChannelIdentifier {
//用户id
private Long userId;
//设备信息
private String device;
}
//用户id到多台设备的映射关系
private ConcurrentHashMap<Long, Set<ChannelIdentifier>> userToDevices = new ConcurrentHashMap<>();
//在某一个设备上的用户与TCP连接的映射关系
private ConcurrentHashMap<ChannelIdentifier, Channel> deviceToChannel = new ConcurrentHashMap<>();

进行 deviceToChannelput 操作的时候需要判断是否该设备已和同一台长连服务器建连,如果是的话,需要close之前的连接,才可以建连。

5. 上下行消息处理

对于上行消息,客户端将请求包根据协议定义的格式,封装为HeaderMapBodyMap,在HeaderMap中声明了该消息的类型,客户端的信息、用户信息等。

对于下行消息,也是类似上行消息的格式。

当用户上线的时候,会触发 拉取离线消息 的操作,从 Redissub出离线期间的消息的时候,采用异步多线程的方式来消费消息,虽然消费快了,但是可能会导致消息乱序,因此,不得不对消息进行次序整形,为了实现次序整形,需要在触发取离线消息操作的时候对每一条消息添加一个单调递增seqId,长连网关sub到消息之后,根据seqId对消息进行从小到大排序,也因此实现了消息有序。

当消息下推到客户端的时候,服务器其实也不确定消息到底推送成功没有以及消息推送到哪条了,甚至是中间某条消息没有推送成功。除了需要存储用户当前已同步到的版本号,对于每一个连接,在建立连接的时候,初始化一个tid,从0开始,消息收到之后,需要进行ack操作,类似TCP三次握手过程,如果有消息丢失、超时,那么将会触发重推操作,在直播互动这种场景中,可能消息下推QPS特别高,且不一定需要全部消息都关注下推成功,但为了统计一下服务推送到达能力,可以采用采样ack的方式,在消息的HeaderMap中添加一个isSampled = true,客户端及长连网关判断该消息需要采样ack,则执行ack操作,否则不执行。

用户上线的时候,需要和长连服务器建连,本文将涉及服务器地址下发设计。

IM

由于无法得知SLB的可用度,需要计算求和每一台ECS的可用度,间接得出。

在每一台长连网关程序上都启动了一个定时任务,每隔 5s 获取调度一次,计算ppsbytes,然后将二者分别除去设置的阈值,得到一个已用度,再用 1 - 已用度 得到 可用度的值,再在二者之中取最小值,作为当前ECS服务器的最终可用度,然后将这个值存到Redis中。(PPS及Bytes计算方法

下发IP服务程序中,会去读取Redis中存储的每一个SLB对应的ECS服务器列表,每次统计可用度的时候,读出所有ECS服务器当前的可用度,将可用度求和,作为当前SLB的可用度。将计算得出的SLB可用度缓存到机器内存(调用Guava的LoadingCache实现)及Redis中。

为了让可用度比较接近的SLB被均衡连接,将可用度乘以一个随机数再去进行排序,让同一时刻内返回的SLB列表不会过于固定,造成某一个SLB过于饱和。