编程知识 cdmana.com

How to quickly import the huge files with more than 100 million data into the production environment?

Hello, Hello everyone , I'm little black brother downstairs ~

If I give you a huge file containing 100 million lines of data , Let you transform the data into the production database within a week , How would you operate ?

The above problem is that little black brother received a real business demand some time ago , Migration of historical data from an old system to a new production system through offline files .

Because the owners have already fixed the time for the new system to go online , So it's only a week for little black brother to import the historical data into the production system .

Due to time constraints , And the amount of data is huge , So little black brother came up with a solution in the design process :

  • Split files
  • Multi threaded import

Welcome to my official account. : Xiao Hei is at half past eleven , Get daily dry push . If you are interested in my topic content , You can also follow my blog :studyidea.cn

Split files

First of all, we can write a small program , Or use the split command split Split this huge file into small files .

--  Split a large file into several small files , Each file  100000  That's ok 
split -l 100000 largeFile.txt -d -a 4 smallFile_

The reason why we choose to split the large file first is that , There are two main reasons for this :

First, if the program reads this large file directly , Let's say that half of the time , The program goes down suddenly , This will directly lose the progress of file reading , We need to restart reading again .

And after the file split , Once the small file reading is finished , We can move small files to a specified folder .

In this way, even if the application is down and restarted , When we reread , Just read the rest of the file .

second , A file , Can only be read by one application , This limits the speed of the import .

And after the file split , We can adopt the way of multi node deployment , Horizontal expansion . Each node reads part of the file , In this way, the import speed can be doubled .

Multi threaded import

After we split the file , Then we need to read the contents of the file , Import .

When we split up before , Set each small file to contain 10w Row data . For fear that all of a sudden 10w Data reading application , This causes the heap to take up too much memory , Cause frequent Full GC, So the following uses the way of streaming reading , Read data line by line .

Yes, of course , If the file is small after splitting , In other words, the heap memory setting of the application is very large , We can directly load the file into the application memory for processing . This is relatively simple .

The code to read line by line is as follows :

File file = ...
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line=iterator.nextLine();
        convertToDB(line);
    }

}

The above code USES commons-io Medium LineIterator class , The underlying class uses BufferedReader Read file contents . It encapsulates it as an iterator pattern , In this way, we can easily read iteratively .

If you currently use JDK1.8 , So the above operation is simpler , We can use it directly JDK Native classes Files Convert document to Stream Mode reading , The code is as follows :

Files.lines(Paths.get(" File path "), Charset.defaultCharset()).forEach(line -> {
    convertToDB(line);
});

Actually, take a closer look Files#lines Bottom source , In fact, the principle is the same as above LineIterator similar , It's also encapsulated in iterator mode .

There are some problems in the introduction of multithreading

The code read above is not difficult to write , But there are efficiency issues , Mainly because only a single thread is importing , After the data of the previous row is imported , To move on to the next line .

In order to speed up the import , Let's get a few more threads , Concurrent import .

Multithreading we will naturally use the thread pool approach , The relevant code is modified as follows :

File file = ...;
ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
  			//  Number of documents , Suppose the file contains  10W  That's ok 
        new ArrayBlockingQueue<>(10*10000),
  			 // guava  Provide 
        new ThreadFactoryBuilder().setNameFormat("test-%d").build());
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> {
            convertToDB(line);
        });
    }

}

In the above code , Every line read , It will be directly executed by the thread pool .

We know that thread pooling works as follows :

  1. If the number of core threads is not full , A thread will be created directly to perform the task .
  2. If the number of core threads is full , The task will be put in the queue .
  3. If the queue is full , The thread will be created again to execute the task .
  4. If the maximum number of threads is full , The queue is full , Then a rejection strategy will be implemented .

 Thread pool execution flowchart

Because we set the number of core threads in the thread pool as 5, Soon the maximum number of core threads was reached , Subsequent tasks can only be queued .

In order not to be rejected by subsequent threads , We can adopt the following plan :

  • Set the queue size to large , Include all lines of the entire file
  • Set the maximum number of threads to large , The quantity is greater than the number of all lines in pieces

The above two schemes have the same problem , The first is equivalent to loading all the contents of a file into memory , Will take up too much memory .

The second is to create too many threads , It also takes up too much memory .

Once the memory is used up too much ,GC Can't clean up , May cause frequent Full GC, Even lead to OOM, Cause program import speed is too slow .

Solve this problem , There are two possible solutions :

  • CountDownLatch Batch execution
  • Expand thread pool

CountDownLatch Batch execution

JDK Provided CountDownLatch, You can make the main thread wait for all the sub threads to complete execution , Go on and on .

Take advantage of this feature , We can modify the code of multithreading import , The main logic is as follows :

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    //  Stores the number of rows executed by each task 
    List<String> lines = Lists.newArrayList();
    //  Asynchronous task storage 
    List<ConvertTask> tasks = Lists.newArrayList();
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        lines.add(line);
        //  Set the number of lines per thread 
        if (lines.size() == 1000) {
            //  New asynchronous task , Notice that you need to create a  List
            tasks.add(new ConvertTask(Lists.newArrayList(lines)));
            lines.clear();
        }
        if (tasks.size() == 10) {
            asyncBatchExecuteTask(tasks);
        }

    }
    //  End of file read , But there may still be content that hasn't been identified 
    tasks.add(new ConvertTask(Lists.newArrayList(lines)));
    //  Do it again 
    asyncBatchExecuteTask(tasks);
}

In this code , Each asynchronous task will import 1000 Row data , And so on 10 Asynchronous tasks , Then it will call asyncBatchExecuteTask Use thread pool to execute asynchronously .

/**
 *  Perform tasks in batches 
 *
 * @param tasks
 */
private static void asyncBatchExecuteTask(List<ConvertTask> tasks) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
    for (ConvertTask task : tasks) {
        task.setCountDownLatch(countDownLatch);
        executorService.submit(task);
    }
    //  The main thread waits for the asynchronous thread  countDownLatch  end of execution 
    countDownLatch.await();
    //  Empty , Re add tasks 
    tasks.clear();
}

asyncBatchExecuteTask Method will create CountDownLatch, And then call... Inside the main thread await Method to wait for all asynchronous threads to execute .

ConvertTask The asynchronous task logic is as follows :

/**
 *  Asynchronous task 
 *  After the data is imported , Be sure to call  countDownLatch.countDown()
 *  Otherwise , The main route will be blocked ,
 */
private static class ConvertTask implements Runnable {

    private CountDownLatch countDownLatch;

    private List<String> lines;

    public ConvertTask(List<String> lines) {
        this.lines = lines;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            for (String line : lines) {
                convertToDB(line);
            }
        } finally {
            countDownLatch.countDown();
        }
    }
}

ConvertTask Task class logic is very simple , Traverse all lines , Import it into the database . All data import ends , call countDownLatch#countDown.

Once all asynchronous threads have finished executing , call countDownLatch#countDown, The main thread will wake up , Continue with file reading .

Although this approach solves the above problems , But this way , You need to accumulate a certain number of tasks each time to start executing all tasks asynchronously .

In addition, each time you have to wait for all tasks to finish , To start the next mission , The time consumed by batch execution is equal to the time consumed by the slowest asynchronous task .

In this way, there is a certain amount of idle time in the thread pool , Is there any way to squeeze the thread pool all the time , Keep it working all the time ?

Expand thread pool

Back to the beginning , File read import , It's really just a producer - consumer Consumption model .

The main thread as a producer constantly reads files , And then put it in the queue .

Asynchronous threads, as consumers, continue to read from the queue , Import into database .

Once the queue is full , Producers should block , Until the consumer consumption task .

In fact, we also use a thread pool producer - consumer Consumption model , It also uses blocking queues .

So why is the thread pool when the queue is full , There's no blockage ?

This is because the thread pool is used internally offer Method , This method is used when the queue is full There will be no blockage , I'm going straight back .

Is there any way we can when the queue of the thread pool is full , Blocking the main thread to add tasks ?

It can , We customize the thread pool rejection policy , Call instead when the queue is full BlockingQueue.put To achieve producer blocking .

RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                // should not be interrupted
            }
        }

    }
};

So once the thread pool is full , The main thread will be blocked .

In this way , We can directly use the multithreaded import code mentioned above .

ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("test-%d").build(),
        (r, executor) -> {
            if (!executor.isShutdown()) {
                try {
                  	//  The main thread will be blocked 
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // should not be interrupted
                }
            }

        });
File file = new File(" File path ");

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> convertToDB(line));
    }
}    

Summary

A huge file , We can split files , Split it into multiple files , Then deploy multiple applications to speed up reading .

In addition, we can also use multithreading for concurrent import , But we need to pay attention to when the thread pool is full , Will reject follow-up tasks .

We can expand the thread pool , Custom reject policy , Block the main read thread .

Okay , That's all for today's article , I wonder if you have any better solutions , Feel free to leave a comment .

Welcome to my official account. : Procedural matters , Get daily dry push . If you are interested in my topic content , You can also follow my blog :studyidea.cn

版权声明
本文为[Little black brother downstairs]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224102141304H.html

Scroll to Top