编程知识 cdmana.com

Java maintains online and offline status and location information of users in batch based on queue and database

Catalog

Introduction

User data migration

Batch processing of user status

Mass consumption of queue data


Introduction

The real-time location of system users is saved in redis in , We use redis Expired way to monitor user location offline , Offline users key It will be put into the corresponding queue for consumption ( The queue here can be Java The queue can also be RabbitMQ And so on , We're going to use ConcurrentLinkedQueue). There are two requirements for users' real-time location in the project ( Special industry requirements for personnel location safety regulations ):

  • a. The user's last real-time position of longitude and latitude is updated to the database
  • b. Update the online status of users when they are offline

In order to solve the problem of insufficient database connection caused by the heartbeat update of user location , I adopt the way of batch update to solve the problem of mass online and offline updates of users . The following problems have been solved :

  • a. Solve the problem of inconsistent user status table statistics
  • b. Solve the problem that database update takes up too many database connections ( It can update online status and offline status in batch , Last location save update reduces database pressure )
  • c. solve redis and database Online status delay is too large

User data migration

For those users who need to maintain the online location data of the last user , We have individual user roles to limit . The user can simply save the data . The first initialization of this kind of data is based on the existing user data SQL Supported by SELECT [A,B,....] INTO TABLE_SUB FROM TABLE_MAIN 

The sample script is as follows :

--  Be careful :====== Rely on triggers to update user information =====
--   Purpose of revision :
--  a. Solve the problem of inconsistent user status table statistics 
--  b. Solve the problem that database update takes up too many database connections ( It can update online status and offline status in batch , Last location save update reduces database pressure )
--  c. solve redis and database Online status delay is too large 
--  The following table data record can be executed repeatedly 
DROP TABLE if exists xh_yw.xh_user_online_tb ;

SELECT
  --  Organization ID
  i_orgid,
  --  Organization number 
	c_orgbh,
	--  Name of organization 
	c_orgname,
	--  user ID
	i_userid,
	--  User name 
	c_userealname,
	--  Phone number 
	c_usertel,
	--  Time stamp 
  make_timestamp(2020,1,1,0,0,0) AS lasttime,
  --  Last position : longitude 
  0.0 AS longitude,
  --  Last position : latitude 
	0.0 AS latitude,
	--  Whether online : 0  offline   1  On-line 
	0 AS is_online ,
	--  date make_date(2020,1,1) AS last_date
	'2020-01-01' AS last_date

INTO xh_yw.xh_user_online_tb

FROM xh_ht.fs_yw_base_user where i_userid
--  Role filtering 
IN (select  distinct i_userid from xh_ht.fs_yw_user_role where i_roleid = 5 )

Batch processing of user status

Batch processing depends on the way the database supports .

MySQL Reference resources :https://www.cnblogs.com/mslagee/p/6509682.html

Postgresql Reference resources :https://www.itranslater.com/qa/details/2583251656280376320

Java Code example :

package com.patrol.position.service;

import com.alibaba.fastjson.JSONArray;
import com.forestar.platform.dao.DatabaseRepository;
import com.patrol.beans.Constants;
import com.patrol.beans.user.UserPosition;
import com.patrol.beans.util.LogicUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Copyright: 2019-2021
 * @FileName: UserOnlineService.java
 * @Author: PJL
 * @Date: 2020/7/15 19:37
 * @Description:  User online intermediate data table service 【 Batch update by queue 】
 */
@Slf4j
@Service
public class UserOnlineService {

    /**
     * Redis Query tool template class 
     */
    @Qualifier("redisTemplateByLettuce")
    @Autowired
    RedisTemplate redisTemplate;

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Autowired
    DatabaseRepository databaseRepository;

    SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");

    /**
     *  User online location middle table 
     */
    final String XH_USER_ONLINE_TB = "XH_USER_ONLINE_TB";

    /**
     *  Parsing temporary table data multi column values 
     *
     * @param list
     * @return
     */
    private List parseMultiParams(List<UserPosition> list) {
        List result = new ArrayList();
        int count = list.size();
        Integer[] userArray = new Integer[count];
        String[] lastTimeArray = new String[count];
        String[] lastDateArray = new String[count];
        Double[] longitudeArray = new Double[count];
        Double[] latitudeArray = new Double[count];
        UserPosition userPosition;
        Date date;
        for (int i = 0; i < count; i++) {
            userPosition = list.get(i);
            if (null != userPosition) {
                date = new Date(userPosition.getTimestamp());
                userArray[i] = Integer.valueOf(userPosition.getUserId());
                lastTimeArray[i] = new StringBuffer("date_trunc('second', TIMESTAMP '").append(sdfTime.format(date)).append("')").toString();
                String[] dateValues = sdfDate.format(date).split("-");
                lastDateArray[i] = new StringBuffer("make_date(").append(dateValues[0]).append(",").append(dateValues[1]).append(",").append(dateValues[2]).append(")").toString();
                longitudeArray[i] = userPosition.getPosition()[0];
                latitudeArray[i] = userPosition.getPosition()[1];
            }
        }
        result.add(userArray);
        result.add(lastTimeArray);
        result.add(lastDateArray);
        result.add(longitudeArray);
        result.add(latitudeArray);
        return result;
    }


    /**
     *  Batch update user online status table 
     *
     * @param list
     */
    public void batchOnline(List<UserPosition> list) {
        if (ObjectUtils.isNotEmpty(list)) {
            List paramList = this.parseMultiParams(list);
            String userIds = JSONArray.toJSONString(paramList.get(0));
            StringBuffer lastTimes = new StringBuffer();
            String[] timeList = (String[]) paramList.get(1);
            for (String s : timeList) {
                if (lastTimes.length() == 0) {
                    lastTimes.append(s);
                } else {
                    lastTimes.append(",").append(s);
                }
            }
            String[] dateList = (String[]) paramList.get(2);
            StringBuffer lastDates = new StringBuffer();
            for (String s : dateList) {
                if (lastDates.length() == 0) {
                    lastDates.append(s);
                } else {
                    lastDates.append(",").append(s);
                }
            }
            String longitudes = JSONArray.toJSONString(paramList.get(3));
            String latitudes = JSONArray.toJSONString(paramList.get(4));
            StringBuffer sb = new StringBuffer(" UPDATE ")
                    .append(Constants.DB_YW_TABLE_SPACE).append(XH_USER_ONLINE_TB).append(" a ")
                    .append(" SET ")
                    .append(" LASTTIME = u.LASTTIME,")
                    .append(" LAST_DATE = u.LAST_DATE,")
                    .append(" LONGITUDE = u.LONGITUDE,")
                    .append(" LATITUDE = u.LATITUDE,")
                    .append(" IS_ONLINE = 1 ")
                    .append(" FROM ( SELECT ")
                    .append(" unnest(array").append(userIds).append(") ").append(" as I_USERID,")
                    .append(" unnest(array[").append(lastTimes).append("]) ").append(" as LASTTIME,")
                    .append(" unnest(array[").append(lastDates).append("]) ").append(" as LAST_DATE,")
                    .append(" unnest(array").append(longitudes).append(") ").append(" as LONGITUDE,")
                    .append(" unnest(array").append(latitudes).append(") ").append(" as LATITUDE")
                    .append("  ) as u ")
                    .append(" WHERE a.I_USERID = u.I_USERID ");
            jdbcTemplate.execute(sb.toString());
        }

    }

    /**
     *  User offline status modification (0: offline   1: On-line )
     *
     * @param userIdList
     */
    public void updateUserOffline(List<String> userIdList) {
        String[] userIds = new String[userIdList.size()];
        userIds = userIdList.toArray(userIds);
        String filter = LogicUtil.getOrgFilterString(userIds);
        String sql = new StringBuffer(" UPDATE ").append(XH_USER_ONLINE_TB).append(" SET IS_ONLINE = 0 WHERE I_USERID IN (").append(filter).append(")").toString();
        databaseRepository.execute(XH_USER_ONLINE_TB, sql);
    }
}

Be careful :Postgresql Function dates and timestamps require special handling .

Mass consumption of queue data

Users only need two queues to distinguish between online and offline processing .

package com.patrol.position.queue;

import com.patrol.beans.user.UserPosition;
import com.patrol.config.condition.ServerCondition;
import com.patrol.position.service.UserOnlineService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @Copyright: 2019-2021
 * @FileName: UpdateStatusOnlineQueue.java
 * @Author: PJL
 * @Date: 2020/8/20 11:03
 * @Description:  Online offline status list data update queue 
 */
@Slf4j
@Component
public class UpdateStatusQueue {

    @Autowired
    UserOnlineService userOnlineService;

    /**
     *  Concurrent linked list queue -- Online location queue 
     */
    private static final ConcurrentLinkedQueue<UserPosition> onlineQueue = new ConcurrentLinkedQueue<>();

    /**
     *  Concurrent linked list queue -- Offline location queue 
     */
    private static final ConcurrentLinkedQueue<String> offlineQueue = new ConcurrentLinkedQueue<>();

    /**
     *  Consumer users go online 、 Offline tasks 
     */
    @PostConstruct
    private void consumeUserOnlineStatusQueue() {
        if (ServerCondition.isServer) {
            log.info(">>>>>>>> Start the server consumption thread ....");
            /*********** Users online mass consumption ***********/
            this.userOnline();

            /*********** Offline mass consumption by users ***********/
            this.userOffline();

            log.info(">>>>>>>> Start the server consumption thread .... complete !");
        }
    }


    /**
     *  Add online users 
     *
     * @param userPosition
     */
    public static void addToOnlineQueue(UserPosition userPosition) {
        if (ServerCondition.isServer) {
            onlineQueue.add(userPosition);
        }
    }

    /**
     *  Add offline users 
     *
     * @param userId
     */
    public static void addToOfflineQueue(String userId) {
        if (ServerCondition.isServer) {
            offlineQueue.add(userId);
        }
    }

    /**
     *  Users online update the last online location batch processing 
     */
    private void userOnline() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int size = onlineQueue.size();
                    if (size > 0) {
                        List<UserPosition> list = new ArrayList<>();
                        UserPosition userPosition;
                        for (int i = 0; i < size; i++) {
                            userPosition = onlineQueue.poll();
                            if (null != userPosition) {
                                list.add(userPosition);
                            }
                        }
                        if (ObjectUtils.isNotEmpty(list)) {
                            //  Mass launch 
                            userOnlineService.batchOnline(list);
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    /**
     *  Update the user online status table to offline 
     */
    private void userOffline() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int size = offlineQueue.size();
                    if (size > 0) {
                        List<String> list = new ArrayList<>();
                        String userId;
                        for (int i = 0; i < size; i++) {
                            userId = offlineQueue.poll();
                            if (null != userId && StringUtils.isNotEmpty(userId)) {
                                list.add(userId);
                            }
                        }
                        if (ObjectUtils.isNotEmpty(list)) {
                            //  Batch offline 
                            userOnlineService.updateUserOffline(list);
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

Two threads are opened here , According to the user size, it can be changed to multi thread concurrent consumption ( But pay attention to controlling database connections ).

 

版权声明
本文为[boonya]所创,转载请带上原文链接,感谢

Scroll to Top