编程知识 cdmana.com

Java基于队列和数据库批量维护用户在线离线状态和位置信息

目录

前言介绍

用户数据迁移

用户状态数据批量处理

队列数据批量消费


前言介绍

系统用户实时位置都保存在redis中,我们采用redis过期方式来监听用户位置离线,离线的用户key会放入对应的队列进行消费(这里的队列可以是Java队列也可以是RabbitMQ之类的消息中间件,我们采用的是ConcurrentLinkedQueue)。项目中对用户实时位置有两个要求(特殊行业对人员位置安全规范的要求):

  • a.用户最后一次经纬度实时位置更新到数据库
  • b.用户离线后更新用户在线状态

为了解决每次用户位置心跳更新导致数据库连接不足问题,我采用了批量更新的方式来解决大批量的用户上线和离线更新。解决了如下问题:

  • a.解决用户状态表统计数据不一致问题
  • b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
  • c.解决redis和database在线状态延迟过大问题

用户数据迁移

针对需要维护最后用户在线位置数据的这类用户,我们有单独用户角色加以限定。这种用户只需要简单将数据做个分表保存起来就可以了。首次初始化这类数据是根据已有用户数据直接采用SQL 支持的SELECT [A,B,....] INTO TABLE_SUB FROM TABLE_MAIN 

示例脚本如下:

-- 注意:======依赖触发器更新用户信息=====
--  修改目的:
--  a.解决用户状态表格统计数据不一致问题
--  b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
--  c.解决redis和database在线状态延迟过大问题
-- 可反复执行如下表数据记录
DROP TABLE if exists xh_yw.xh_user_online_tb ;

SELECT
  -- 组织机构ID
  i_orgid,
  -- 组织机构编号
	c_orgbh,
	-- 组织机构名称
	c_orgname,
	-- 用户ID
	i_userid,
	-- 用户姓名
	c_userealname,
	-- 手机号码
	c_usertel,
	-- 时间戳
  make_timestamp(2020,1,1,0,0,0) AS lasttime,
  -- 最后位置:经度
  0.0 AS longitude,
  -- 最后位置:纬度
	0.0 AS latitude,
	-- 是否在线: 0 离线  1 在线
	0 AS is_online ,
	-- 日期make_date(2020,1,1) AS last_date
	'2020-01-01' AS last_date

INTO xh_yw.xh_user_online_tb

FROM xh_ht.fs_yw_base_user where i_userid
-- 角色过滤
IN (select  distinct i_userid from xh_ht.fs_yw_user_role where i_roleid = 5 )

用户状态数据批量处理

批量处理都依赖数据库支持的方式。

MySQL参考:https://www.cnblogs.com/mslagee/p/6509682.html

Postgresql参考:https://www.itranslater.com/qa/details/2583251656280376320

Java代码示例:

package com.patrol.position.service;

import com.alibaba.fastjson.JSONArray;
import com.forestar.platform.dao.DatabaseRepository;
import com.patrol.beans.Constants;
import com.patrol.beans.user.UserPosition;
import com.patrol.beans.util.LogicUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Copyright: 2019-2021
 * @FileName: UserOnlineService.java
 * @Author: PJL
 * @Date: 2020/7/15 19:37
 * @Description: 用户在线中间数据表服务【通过队列方式批量更新】
 */
@Slf4j
@Service
public class UserOnlineService {

    /**
     * Redis查询工具模板类
     */
    @Qualifier("redisTemplateByLettuce")
    @Autowired
    RedisTemplate redisTemplate;

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Autowired
    DatabaseRepository databaseRepository;

    SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");

    /**
     * 用户在线位置中间表
     */
    final String XH_USER_ONLINE_TB = "XH_USER_ONLINE_TB";

    /**
     * 解析临时表数据多列值
     *
     * @param list
     * @return
     */
    private List parseMultiParams(List<UserPosition> list) {
        List result = new ArrayList();
        int count = list.size();
        Integer[] userArray = new Integer[count];
        String[] lastTimeArray = new String[count];
        String[] lastDateArray = new String[count];
        Double[] longitudeArray = new Double[count];
        Double[] latitudeArray = new Double[count];
        UserPosition userPosition;
        Date date;
        for (int i = 0; i < count; i++) {
            userPosition = list.get(i);
            if (null != userPosition) {
                date = new Date(userPosition.getTimestamp());
                userArray[i] = Integer.valueOf(userPosition.getUserId());
                lastTimeArray[i] = new StringBuffer("date_trunc('second', TIMESTAMP '").append(sdfTime.format(date)).append("')").toString();
                String[] dateValues = sdfDate.format(date).split("-");
                lastDateArray[i] = new StringBuffer("make_date(").append(dateValues[0]).append(",").append(dateValues[1]).append(",").append(dateValues[2]).append(")").toString();
                longitudeArray[i] = userPosition.getPosition()[0];
                latitudeArray[i] = userPosition.getPosition()[1];
            }
        }
        result.add(userArray);
        result.add(lastTimeArray);
        result.add(lastDateArray);
        result.add(longitudeArray);
        result.add(latitudeArray);
        return result;
    }


    /**
     * 批量更新用户上线状态表
     *
     * @param list
     */
    public void batchOnline(List<UserPosition> list) {
        if (ObjectUtils.isNotEmpty(list)) {
            List paramList = this.parseMultiParams(list);
            String userIds = JSONArray.toJSONString(paramList.get(0));
            StringBuffer lastTimes = new StringBuffer();
            String[] timeList = (String[]) paramList.get(1);
            for (String s : timeList) {
                if (lastTimes.length() == 0) {
                    lastTimes.append(s);
                } else {
                    lastTimes.append(",").append(s);
                }
            }
            String[] dateList = (String[]) paramList.get(2);
            StringBuffer lastDates = new StringBuffer();
            for (String s : dateList) {
                if (lastDates.length() == 0) {
                    lastDates.append(s);
                } else {
                    lastDates.append(",").append(s);
                }
            }
            String longitudes = JSONArray.toJSONString(paramList.get(3));
            String latitudes = JSONArray.toJSONString(paramList.get(4));
            StringBuffer sb = new StringBuffer(" UPDATE ")
                    .append(Constants.DB_YW_TABLE_SPACE).append(XH_USER_ONLINE_TB).append(" a ")
                    .append(" SET ")
                    .append(" LASTTIME = u.LASTTIME,")
                    .append(" LAST_DATE = u.LAST_DATE,")
                    .append(" LONGITUDE = u.LONGITUDE,")
                    .append(" LATITUDE = u.LATITUDE,")
                    .append(" IS_ONLINE = 1 ")
                    .append(" FROM ( SELECT ")
                    .append(" unnest(array").append(userIds).append(") ").append(" as I_USERID,")
                    .append(" unnest(array[").append(lastTimes).append("]) ").append(" as LASTTIME,")
                    .append(" unnest(array[").append(lastDates).append("]) ").append(" as LAST_DATE,")
                    .append(" unnest(array").append(longitudes).append(") ").append(" as LONGITUDE,")
                    .append(" unnest(array").append(latitudes).append(") ").append(" as LATITUDE")
                    .append("  ) as u ")
                    .append(" WHERE a.I_USERID = u.I_USERID ");
            jdbcTemplate.execute(sb.toString());
        }

    }

    /**
     * 用户离线状态修改(0:离线  1:在线)
     *
     * @param userIdList
     */
    public void updateUserOffline(List<String> userIdList) {
        String[] userIds = new String[userIdList.size()];
        userIds = userIdList.toArray(userIds);
        String filter = LogicUtil.getOrgFilterString(userIds);
        String sql = new StringBuffer(" UPDATE ").append(XH_USER_ONLINE_TB).append(" SET IS_ONLINE = 0 WHERE I_USERID IN (").append(filter).append(")").toString();
        databaseRepository.execute(XH_USER_ONLINE_TB, sql);
    }
}

注意:Postgresql函数日期和时间戳需要特殊处理。

队列数据批量消费

用户在线离线只需要两个队列就可以区分开处理了。

package com.patrol.position.queue;

import com.patrol.beans.user.UserPosition;
import com.patrol.config.condition.ServerCondition;
import com.patrol.position.service.UserOnlineService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @Copyright: 2019-2021
 * @FileName: UpdateStatusOnlineQueue.java
 * @Author: PJL
 * @Date: 2020/8/20 11:03
 * @Description: 在线离线状态列表数据更新队列
 */
@Slf4j
@Component
public class UpdateStatusQueue {

    @Autowired
    UserOnlineService userOnlineService;

    /**
     * 并发链表队列--在线位置队列
     */
    private static final ConcurrentLinkedQueue<UserPosition> onlineQueue = new ConcurrentLinkedQueue<>();

    /**
     * 并发链表队列--离线位置队列
     */
    private static final ConcurrentLinkedQueue<String> offlineQueue = new ConcurrentLinkedQueue<>();

    /**
     * 消费用户上线、离线下线任务
     */
    @PostConstruct
    private void consumeUserOnlineStatusQueue() {
        if (ServerCondition.isServer) {
            log.info(">>>>>>>>启动服务端消费线程....");
            /***********用户上线批量消费***********/
            this.userOnline();

            /***********用户离线状态批量消费***********/
            this.userOffline();

            log.info(">>>>>>>>启动服务端消费线程....完毕!");
        }
    }


    /**
     * 添加在线用户
     *
     * @param userPosition
     */
    public static void addToOnlineQueue(UserPosition userPosition) {
        if (ServerCondition.isServer) {
            onlineQueue.add(userPosition);
        }
    }

    /**
     * 添加离线用户
     *
     * @param userId
     */
    public static void addToOfflineQueue(String userId) {
        if (ServerCondition.isServer) {
            offlineQueue.add(userId);
        }
    }

    /**
     * 用户上线更新最后一次上线位置批量处理
     */
    private void userOnline() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int size = onlineQueue.size();
                    if (size > 0) {
                        List<UserPosition> list = new ArrayList<>();
                        UserPosition userPosition;
                        for (int i = 0; i < size; i++) {
                            userPosition = onlineQueue.poll();
                            if (null != userPosition) {
                                list.add(userPosition);
                            }
                        }
                        if (ObjectUtils.isNotEmpty(list)) {
                            // 批量上线
                            userOnlineService.batchOnline(list);
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    /**
     * 更新用户在线状态表为离线
     */
    private void userOffline() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int size = offlineQueue.size();
                    if (size > 0) {
                        List<String> list = new ArrayList<>();
                        String userId;
                        for (int i = 0; i < size; i++) {
                            userId = offlineQueue.poll();
                            if (null != userId && StringUtils.isNotEmpty(userId)) {
                                list.add(userId);
                            }
                        }
                        if (ObjectUtils.isNotEmpty(list)) {
                            // 批量离线
                            userOnlineService.updateUserOffline(list);
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

这里开了两个线程处理,根据用户规模可以改为多线程并发消费(但是请注意控制数据库连接)。

 

版权声明
本文为[boonya]所创,转载请带上原文链接,感谢
https://blog.csdn.net/boonya/article/details/108299802

Scroll to Top