由于多线程及网络延迟的原因,离线消息sub出来可能会乱序。
假设存在离线消息队列 Redis,以及 10 条消息,理想情况下消息按照顺序1,2,3,4,5,6,7,8,9,10完整pub入Redis中,但可能因为网络延迟导致顺序变成1,3,2,6,5,4,7,8,9,10,也可能顺序存入没问题,实打实的就是1,2,3,4,5,6,7,8,9,10,但是从Redis中sub出来的时候变成了1,2,3,5,6,4,8,9,7,10,也因此导致了离线消息乱序的产生。
为了解决这个问题,可以在每次触发拉取离线消息的时候,在服务端先取出所有消息,给每一条消息设置一个从0开始的自增seqId,为了防止突然上线突然下线反复拉取离线消息,需要给每一次的离线消息设置一个随机packageId,也是为了区分消息是否是新写入离线消息队列中的消息用的。
定义Message类表示要下推前未处理的消息
1 2 3 4 5 6 7 8 9
   | @Data public class Message {     private String msgId;     private String deviceId;     private Long userId;     private int packageId;     private int seqId;      }
   | 
定义ChannelUtil类表示Channel工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | public class ChannelUtil {     private static final AttributeKey<ConcurrentHashMap<Integer, PackageQueue>> packageKey = AttributeKey.valueOf("package_queue");
      public static void setPackageQueue(Channel channel, ConcurrentHashMap<Integer, PackageQueue> packageQueueMap) {         channel.attr(packageKey).set(packageQueueMap);     }
      public static ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {         return channel.attr(packageKey).get();     }
      public static Channel getChannelByUserInfo(String deviceId, Long userId) {                  return null;     } }
  | 
定义SequenceMessage类表示排序消息
1 2 3 4 5 6 7
   | @Data @NoArgsConstructor @AllArgsConstructor public class SequenceMessage {     private int seqId;     private String msgId; }
   | 
定义PackageQueue表示包队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | 
 
 
  @Data public class PackageQueue {          private PriorityQueue<SequenceMessage> q =             new PriorityQueue<>(Comparator.comparingInt(s -> s.getSeqId()));
      private Channel channel;     private int packageId;
      public PackageQueue() {
      }
      public PackageQueue(Channel channel, int packageId) {         this.channel = channel;         this.packageId = packageId;     }
      public void addMsg(SequenceMessage msg) {         q.add(msg);     }
      public void drainOut() {                  while (!q.isEmpty()) {                          channel.writeAndFlush(q.poll());         }     } }
 
  | 
定义PackageQueueManager表示包队列管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | 
 
  public class PackageQueueManager {          private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
      public ConcurrentHashMap<Integer, PackageQueue> getPackageQueue(Channel channel) {         return ChannelUtil.getPackageQueue(channel);     }
           public void addMsg(Message message) {         Channel channel = ChannelUtil.getChannelByUserInfo(message.getDeviceId(), message.getUserId());         if (channel != null && channel.isActive()) {             ConcurrentHashMap<Integer, PackageQueue> packageQueueMap = getPackageQueue(channel);             packageQueueMap.computeIfAbsent(message.getPackageId(), pid -> {                 PackageQueue packageQueue = new PackageQueue(channel, pid);                                  executorService.schedule(packageQueue::drainOut, 2, TimeUnit.SECONDS);                 return packageQueue;             }).addMsg(new SequenceMessage(message.getSeqId(), message.getMsgId()));         }     } }
 
  | 
定义Publisher表示离线消息的发布者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | public class Publisher {
      public void getOfflineMsgs(Long userId, String deviceId, Long syncVersion, String pubChannel) {         List<Message> messageList = loadOfflineMsgs(syncVersion);         int packageId = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);         int seqId = 0;         for (Message message : messageList) {             message.setDeviceId(deviceId);             message.setUserId(userId);             message.setSeqId(seqId++);             message.setPackageId(packageId);         }
               }
      private List<Message> loadOfflineMsgs(long startVersion) {                  return null;     } }
  |