由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。
假设存在离线消息队列 Redis,以及 10 条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10完整pub入Redis中,但可能因为网络延迟导致顺序变成1,3,2,6,5,4,7,8,9,10,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10,但是从Redis中sub出来的时候变成了1,2,3,5,6,4,8,9,7,10,也因此导致了离线消息乱序的产生。
为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId,也是为了区分消息是否是新写入离线消息队列中的消息用的。
定义Message类表示要下推前未处理的消息
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | @Datapublic class Message {
 private String msgId;
 private String deviceId;
 private Long userId;
 private int packageId;
 private int seqId;
 
 }
 
 | 
定义ChannelUtil类表示Channel工具类
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | public class ChannelUtil {private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");
 
 public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) {
 channel.attr(packageKey).set(packageQueueMap);
 }
 
 public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
 return channel.attr(packageKey).get();
 }
 
 public static Channel getChannelByUserInfo(String deviceId, Long userId) {
 
 return null;
 }
 }
 
 | 
定义SequenceMessage类表示排序消息
| 12
 3
 4
 5
 6
 7
 
 | @Data@NoArgsConstructor
 @AllArgsConstructor
 public class SequenceMessage {
 private int seqId;
 private String msgId;
 }
 
 | 
定义PackageQueue表示包队列
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | 
 
 
 @Data
 public class PackageQueue {
 
 private PriorityQueue<SequenceMessage> q =
 new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));
 
 private Channel channel;
 private int packageId;
 
 public PackageQueue() {
 
 }
 
 public PackageQueue(Channel channel, int packageId) {
 this.channel = channel;
 this.packageId = packageId;
 }
 
 public void addMsg(SequenceMessage msg) {
 q.add(msg);
 }
 
 public void drainOut() {
 
 while (!q.isEmpty()) {
 
 channel.writeAndFlush(q.poll());
 }
 }
 }
 
 | 
定义PackageQueueManager表示包队列管理器
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | 
 
 public class PackageQueueManager {
 
 private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
 
 public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {
 return ChannelUtil.getPackageQueue(channel);
 }
 
 
 public void addMsg(Message message) {
 Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId());
 if (channel != null && channel.isActive()) {
 ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel);
 packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> {
 PackageQueue packageQueue = new PackageQueue(channel, pid);
 
 executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS);
 return packageQueue;
 }).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId()));
 }
 }
 }
 
 | 
定义Publisher表示离线消息的发布者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | public class Publisher {
 public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) {
 List<Message> messageList = loadOfflineMsgs(syncVersion);
 int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
 int seqId = 0;
 for (Message message : messageList) {
 message.setDeviceId(deviceId);
 message.setUserId(userId);
 message.setSeqId(seqId++);
 message.setPackageId(packageId);
 }
 
 
 }
 
 private List<Message> loadOfflineMsgs(long startVersion) {
 
 return null;
 }
 }
 
 |