0%

由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。

假设存在离线消息队列 Redis,以及 10 条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10完整pubRedis中,但可能因为网络延迟导致顺序变成1,3,2,6,5,4,7,8,9,10,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10,但是从Redissub出来的时候变成了1,2,3,5,6,4,8,9,7,10,也因此导致了离线消息乱序的产生。

为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId,也是为了区分消息是否是新写入离线消息队列中的消息用的。

定义Message类表示要下推前未处理的消息

1
2
3
4
5
6
7
8
9
@Data
public class Message {
private String msgId;
private String deviceId;
private Long userId;
private int packageId;
private int seqId;
//....more
}

定义ChannelUtil类表示Channel工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ChannelUtil {
private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");

public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) {
channel.attr(packageKey).set(packageQueueMap);
}

public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return channel.attr(packageKey).get();
}

public static Channel getChannelByUserInfo(String deviceId, Long userId) {
//ignore...
return null;
}
}

定义SequenceMessage类表示排序消息

1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SequenceMessage {
private int seqId;
private String msgId;
}

定义PackageQueue表示包队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 包队列
*/

@Data
public class PackageQueue {
//根据seqId排序
private PriorityQueue<SequenceMessage> q =
new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));

private Channel channel;
private int packageId;

public PackageQueue() {

}

public PackageQueue(Channel channel, int packageId) {
this.channel = channel;
this.packageId = packageId;
}

public void addMsg(SequenceMessage msg) {
q.add(msg);
}

public void drainOut() {
//...
while (!q.isEmpty()) {
//需将推送包封装为HeaderMap、BodyMap形式,该demo忽略
channel.writeAndFlush(q.poll());
}
}
}

定义PackageQueueManager表示包队列管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 包队列管理器
*/
public class PackageQueueManager {
//定时
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return ChannelUtil.getPackageQueue(channel);
}

//sub from redis and invoke this method
public void addMsg(Message message) {
Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId());
if (channel != null && channel.isActive()) {
ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel);
packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> {
PackageQueue packageQueue = new PackageQueue(channel, pid);
//2秒下发一次
executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS);
return packageQueue;
}).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId()));
}
}
}

定义Publisher表示离线消息的发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Publisher {

public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) {
List<Message> messageList = loadOfflineMsgs(syncVersion);
int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
int seqId = 0;
for (Message message : messageList) {
message.setDeviceId(deviceId);
message.setUserId(userId);
message.setSeqId(seqId++);
message.setPackageId(packageId);
}

//pub to redis, the channel is : pubChannel
}

private List<Message> loadOfflineMsgs(long startVersion) {
//from redis
return null;
}
}

队列机层起到了聚合多方消息,减轻网关层压力的中间枢纽作用。

作为一个系统,可能是独享的,可能开放出一些能力提供给其它业务方使用,为了方便、安全、快捷,可通过队列机层来集中处理这些消息。

比如:发送私信,客户端可以直接通过上行接口直接调用,可以通过写入消息队列中,可以通过短信发送,而队列机无需关心消息到底怎么来,只需要关注消息可以这么来,而消息的来源渠道可以由各业务方来实现,以及消息如何处理,将要发送到什么地方,是否需要写入离线缓存队列中。

另外,哪怕多来几个不同APP,要接收下推消息,并且不想接入我方实现的网关层来进行消息下推,可以自行 sub 队列机层的 Redis,然后由自己来实现网关层

在整个链路中,API层相当于长连网关层的服务端,也相当于服务层的客户端。

服务层包含有:私信、群聊、群发等。

API层通过HTTP或者RPC的方式,去调用服务层的各种服务,然后返回处理 成功 或者 失败报文给 API层

服务层 处理成功之后,将消息存入MySQLHBase 等存储中,每一条消息都会有个版本号,基于用户维度的,然后将消息写入 RedisMemcachedQKafka 等消息中间件中,由队列机层 来订阅这些消息中间件,再经过一系列的处理后,将最终消息PubRedis中,长连网关层 的每一条服务器都可以Sub到这些消息,可以选择在网关层中对消息进行GZIP压缩,然后最终推送给客户端。