0%

IM - 离线消息乱序的解决

由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。

假设存在离线消息队列 Redis,以及 10 条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10完整pubRedis中,但可能因为网络延迟导致顺序变成1,3,2,6,5,4,7,8,9,10,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10,但是从Redissub出来的时候变成了1,2,3,5,6,4,8,9,7,10,也因此导致了离线消息乱序的产生。

为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId,也是为了区分消息是否是新写入离线消息队列中的消息用的。

定义Message类表示要下推前未处理的消息

1
2
3
4
5
6
7
8
9
@Data
public class Message {
private String msgId;
private String deviceId;
private Long userId;
private int packageId;
private int seqId;
//....more
}

定义ChannelUtil类表示Channel工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ChannelUtil {
private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");

public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) {
channel.attr(packageKey).set(packageQueueMap);
}

public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return channel.attr(packageKey).get();
}

public static Channel getChannelByUserInfo(String deviceId, Long userId) {
//ignore...
return null;
}
}

定义SequenceMessage类表示排序消息

1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SequenceMessage {
private int seqId;
private String msgId;
}

定义PackageQueue表示包队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 包队列
*/

@Data
public class PackageQueue {
//根据seqId排序
private PriorityQueue<SequenceMessage> q =
new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));

private Channel channel;
private int packageId;

public PackageQueue() {

}

public PackageQueue(Channel channel, int packageId) {
this.channel = channel;
this.packageId = packageId;
}

public void addMsg(SequenceMessage msg) {
q.add(msg);
}

public void drainOut() {
//...
while (!q.isEmpty()) {
//需将推送包封装为HeaderMap、BodyMap形式,该demo忽略
channel.writeAndFlush(q.poll());
}
}
}

定义PackageQueueManager表示包队列管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 包队列管理器
*/
public class PackageQueueManager {
//定时
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
return ChannelUtil.getPackageQueue(channel);
}

//sub from redis and invoke this method
public void addMsg(Message message) {
Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId());
if (channel != null && channel.isActive()) {
ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel);
packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> {
PackageQueue packageQueue = new PackageQueue(channel, pid);
//2秒下发一次
executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS);
return packageQueue;
}).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId()));
}
}
}

定义Publisher表示离线消息的发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Publisher {

public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) {
List<Message> messageList = loadOfflineMsgs(syncVersion);
int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
int seqId = 0;
for (Message message : messageList) {
message.setDeviceId(deviceId);
message.setUserId(userId);
message.setSeqId(seqId++);
message.setPackageId(packageId);
}

//pub to redis, the channel is : pubChannel
}

private List<Message> loadOfflineMsgs(long startVersion) {
//from redis
return null;
}
}