由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。
假设存在离线消息队列 Redis
,以及 10
条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10
完整pub
入Redis
中,但可能因为网络延迟
导致顺序变成1,3,2,6,5,4,7,8,9,10
,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10
,但是从Redis
中sub
出来的时候变成了1,2,3,5,6,4,8,9,7,10
,也因此导致了离线消息乱序的产生。
为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId
,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId
,也是为了区分消息是否是新写入离线消息队列中的消息用的。
定义Message
类表示要下推前未处理的消息
1 2 3 4 5 6 7 8 9
| @Data public class Message { private String msgId; private String deviceId; private Long userId; private int packageId; private int seqId; }
|
定义ChannelUtil
类表示Channel
工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ChannelUtil { private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");
public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) { channel.attr(packageKey).set(packageQueueMap); }
public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) { return channel.attr(packageKey).get(); }
public static Channel getChannelByUserInfo(String deviceId, Long userId) { return null; } }
|
定义SequenceMessage
类表示排序消息
1 2 3 4 5 6 7
| @Data @NoArgsConstructor @AllArgsConstructor public class SequenceMessage { private int seqId; private String msgId; }
|
定义PackageQueue
表示包队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
@Data public class PackageQueue { private PriorityQueue<SequenceMessage> q = new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));
private Channel channel; private int packageId;
public PackageQueue() {
}
public PackageQueue(Channel channel, int packageId) { this.channel = channel; this.packageId = packageId; }
public void addMsg(SequenceMessage msg) { q.add(msg); }
public void drainOut() { while (!q.isEmpty()) { channel.writeAndFlush(q.poll()); } } }
|
定义PackageQueueManager
表示包队列管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public class PackageQueueManager { private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) { return ChannelUtil.getPackageQueue(channel); }
public void addMsg(Message message) { Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId()); if (channel != null && channel.isActive()) { ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel); packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> { PackageQueue packageQueue = new PackageQueue(channel, pid); executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS); return packageQueue; }).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId())); } } }
|
定义Publisher
表示离线消息的发布者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Publisher {
public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) { List<Message> messageList = loadOfflineMsgs(syncVersion); int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE); int seqId = 0; for (Message message : messageList) { message.setDeviceId(deviceId); message.setUserId(userId); message.setSeqId(seqId++); message.setPackageId(packageId); }
}
private List<Message> loadOfflineMsgs(long startVersion) { return null; } }
|