编程知识 cdmana.com

MQ数据同步Redis与DB实时数据一致性问题研究

目录

预设场景

问题分析

流程设计

实践验证

问题结论


实时性的数据,一般业务管理数据都是基本上不发生变化的,而发生随时不停发生变化的数据我们将之称之为实时性高的数据叫实时数据。实时数据很难保证在不同数据源下的一致性,除非查看的是某一时刻的数据快照(历史快照),否则很难保持不同数据源数据之间没有差异。所以,从实践中得出的结论是:异构多数据源保证实时数据一致性是一个伪命题,缓存的使用具有不可逆的特性,大多数情况下缓存不能当作真实数据源。

预设场景

欲实现移动App人员位置管理,要求:

1、展示用户总数和在线数量,MQ数据交互到REDIS。

2、保留用户在线情况,MQ数据同步到DB保留在线、离线和位置信息。

3、验证两边的在线数量是否能保持一致。

问题分析

1、从MQ同步数据到REDIS管理用户在线状态数据,网络状态未知的情况下,用户频繁上线、离线。

  • a.用户REDIS高效缓存位置数据
  • b.存在网络抖动(不确定性因素)

2、从MQ同步数据到DB。

  • a.MQ做了二次消费,无论是先放redis还是先放DB都会造成数据延时
  • b.DB和REDIS不能放在同一个事务进行管理
  • c.DB读写和REDIS读写存在效率上的差距

3、推测结论:MQ二次消费到不同数据源不能保证数据实时一致性,既要实时性又要准确性只能使用单一数据源。

流程设计

下面是一幅App心跳上传的位置分发到RabbitMQ进行位置管理的服务,位置保存在REDIS中,DB是PG(Postgresql)。

  • MQ分发逻辑:在OnlineMQ、KeepLiveMQ和OfflineMQ进行消费的时候加了二级队列进行处理,RabbitMQ是异步执行消费。
  • 在二级队列:Java程序并发队列批量处理用户状态位置数据,线程批量处理,是为了提升MQ本身消费的效率问题。
  • 数据维护:Quality Service 和网格聚合都是为了维护用户在线时间过期的辅助服务。
  • 用户30分钟内无位置和心跳上来即离线。
  • 在线有心跳但没有位置的系统进行保活。
  • 用户有心跳有位置的为在线。
  • 真实批量处理线程消费必须要有休眠时间,防止空转调度CPU,这里设置的是50ms(无论设多小都是一个时间差)。
  • 在正常情况下,离线、上线、保活队列数据应该是互斥的。

实践验证

redis统计结果

pg统计结果

 

问题结论

  • 异步的操作实时性不可控。
  • 多个数据源非主从和集群模式要同步实时数据结果是不可信的。
  • 缓存的主要功能是辅助数据库数据服务而非反过来使用。
  • 异构数据库数据同步实时数据一致性性没法保证。

另一个不容忽视的问题就是在取redis实时位置数据的时候尽量把scores作为过滤条件,这样取出来的数据用户有效数据才是比较准确的,可参考如下代码:

  /**
     * 根据组织机构ID查询REDIS【用户在线汇总总数】
     *
     * @param orgId
     * @return
     */
    public Long getOrgOnlineUserFromRedis(String orgId) {
        long now = System.currentTimeMillis();
        long minScores = now - Constants.MOBILE_POSITION_TIME_OUT.longValue() * 1000;
        long maxScores = now + Constants.MOBILE_POSITION_TIME_OUT.longValue() * 1000;
        if (("" + Constants.DB_ORG_ROOT_ID).equals(orgId)) {
            String key = new StringBuffer(Constants.MOBILE_POSITION_ZSET_QG_KEY).toString();
            //return redisTemplate.opsForZSet().zCard(key);// 总数
            return redisTemplate.opsForZSet().count(key, minScores, maxScores); // 得分条件汇总
        } else {
            String key = new StringBuffer(Constants.MOBILE_POSITION_ZSET_DW_INCLUDE_CHILDREN_ORG_KEY).append(orgId).toString();
            // return redisTemplate.opsForZSet().zCard(key);// 总数
            return redisTemplate.opsForZSet().count(key, minScores, maxScores);// 得分条件汇总
        }
    }

 

版权声明
本文为[boonya]所创,转载请带上原文链接,感谢
https://blog.csdn.net/boonya/article/details/108542899

Scroll to Top