编程知识 cdmana.com

How many interview questions must be asked about thread pool from source code analysis of Java Concurrent Programming?

In this article, we will analyze a few small problems in the last article

  • Whether the thread pool distinguishes between core and non core threads ?
  • How to ensure that the core thread is not destroyed ?
  • How can threads in the thread pool be reused ? Let's first look at the last problem. Generally, a thread finishes its task after it has finished ,Thread.start() It can only be called once , Once this call ends , Then the thread arrives stop state , Cannot call again start. If you call a thread object that has already started again start Method words , Will produce :IllegalThreadStateException abnormal , however Thread Of run Methods can be called repeatedly . So there will be a question often asked in the interview :「Thread Class run() and start() What's the difference between methods ?」 Let's start from jdk To see how to achieve thread reuse : The thread pool performs tasks ThreadPoolExecutor#execute The method is the entrance
 public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
   
     int c = ctl.get();
     //  Thread pool when the number of front threads is less than  corePoolSize  When entering if Conditional call  addWorker  Create a core thread to perform tasks 
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     //  The current number of threads in the thread pool is greater than or equal to  corePoolSize , Just add the task to  workQueue  in 
     if (isRunning(c) && workQueue.offer(command)) {
      //  Get the status of the current thread , Assign a value to  recheck , It's to recheck the State 
         int recheck = ctl.get();
         //  If  isRunning  return  false , It would be  remove  Drop this mission , Then execute the rejection strategy , That is, roll back and re queue 
         if (! isRunning(recheck) && remove(command))
             reject(command);
         //  Thread pool in  running  state , But there are no threads , Then create a thread to execute the task 
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     //  If the task is put into  workQueue  Failure , Try to execute the task by creating a non core thread 
     //  Failed to create non core thread , The thread pool is closed or saturated , Will execute a rejection strategy 
     else if (!addWorker(command, false))
         reject(command);
 } 

「excute」 Method main business logic

  • If the current thread pool running thread is less than 「coreSize」, Then create a new thread to execute the task .
  • If the currently running thread is equal to 「coreSize」 Or redundant 「coreSize」( Dynamic changes coreSize That's what happens ), Put the task in the blocking queue .
  • If the queue is full and you can't put new tasks in it , You need to create a new thread to perform the task .
  • If the newly created thread has reached the maximum number of threads , The mission will be rejected .

addWorker Method

The core of the above method is mainly addWorker Method ,

private boolean addWorker(Runnable firstTask, boolean core) {
       //  There is still a part omitted from the front ....

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    } 

Let's take a look at this first 「work」 Class

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            runWorker(this);
        } 

「work」 Class implements the 「Runnable」 Interface , then run Called in the method 「runWorker」 Method

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //  Add create 
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
             //  Judge  task  Is it empty , If it is not empty, execute it directly 
         //  If  task  It's empty , call  getTask()  Method , from  workQueue  Take out the new  task  perform 
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    } 

This runwork The method will take precedence worker Bound tasks , If you create this worker I didn't give you worker Bind task ,worker You get the task from the queue to execute , After execution worker It will not destroy , But through while The loop goes on and on getTask Method to get the task call from the blocking queue task.run() To perform the task , In this way, the goal of thread reuse is achieved .while (task != null || (task = getTask()) != null)  The cyclic condition is as long as getTask  If the returned value is not empty, the loop will not terminate , So the thread will be running all the time .「 How to ensure that the core thread is not destroyed after the task is executed ? Non core thread destruction ?」 That's the answer getTask() Method inside

private Runnable getTask() {
  //  Timeout flag , The default is false, If the workQueue.poll() Method timed out , Will be marked as true
  //  This sign is very important , We will talk about 
  boolean timedOut = false;
  for (;;) {
    //  obtain ctl A variable's value 
    int c = ctl.get();
    int rs = runStateOf(c);

    //  If the current state is greater than or equal to SHUTDOWN, also workQueue Task in is empty or status is greater than or equal to STOP
    //  Operation AQS Reduce the number of worker threads , And back to null, Threads are recycled 
    //  It also shows that the assumed state is SHUTDOWN Under the circumstances , If workQueue Not empty , Then the thread pool can continue to perform the remaining tasks 
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      //  operation AQS Reduce the number of threads in the thread pool by one 
      decrementWorkerCount();
      return null;
    }

    //  Gets the number of valid threads in the thread pool 
    int wc = workerCountOf(c);

    //  If you turn on allowCoreThreadTimeOut, Or get the current worker thread greater than corePoolSize, Then the thread can be reclaimed after timeout 
    // allowCoreThreadTimeOut The default is false, That is, by default, the core thread is not allowed to timeout reclaim 
    //  This also shows that all threads outside the core thread are “ temporary ” Threads , It will be recycled by thread pool at any time 
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    //  Here are two conditions for thread destruction :
    // 1. In principle, the number of thread pools cannot be greater than maximumPoolSize, But there may be concurrent operations setMaximumPoolSize Method , If you reduce the maximum number of threads at this time , It is likely that the current worker thread is larger than the maximum thread , In this case, thread timeout recycling is required , To maintain the maximum thread pool thread less than maximumPoolSize,
    // 2.timed && timedOut  If true, Indicates that the current operation requires timeout control , there timedOut by true, Indicates that the thread has already started from workQueue.poll() Method timed out 
    //  One of the above two points is satisfied , Can trigger thread timeout collection 
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      //  Try to use AQS Reduce the number of threads in the thread pool by one 
      if (compareAndDecrementWorkerCount(c))
        //  Minus one and return null, Threads are recycled 
        return null;
      //  Otherwise loop retry 
      continue;
    }

    try {
      //  If timed by true, Blocking timeout acquisition task , Otherwise block the acquisition task 
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      //  If poll Timeout acquisition task timed out ,  take timeOut Set to true
      //  Continue to loop , If it happens that the developer turns on allowCoreThreadTimeOut, Then the thread satisfies the timeout recovery 
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
} 

So the key code to ensure that threads are not destroyed is this code

 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 

as long as timed by false This workQueue.take() It'll keep blocking , This ensures that the thread will not be destroyed .timed The value of is again passed through allowCoreThreadTimeOut And whether the number of running threads is greater than coreSize The control of the .

  • as long as getTask Method returns null  Our threads will be recycled (runWorker Method will call processWorkerExit)
  • The source code of this method explains why we set the thread pool when creating it allowCoreThreadTimeOut =true Words , The core thread will also be destroyed .
  • In this way, we can also answer the above question. Thread pool does not distinguish between core thread and non core thread .

end

  • Because of my lack of talent and learning , It's hard to avoid mistakes , If you find something wrong , I'd like to leave a message and point it out to me , I'll fix it .
  • If you think the article is good , Your forwarding 、 Share 、 appreciate 、 give the thumbs-up 、 The message is the biggest encouragement to me .
  • Thank you for reading , Thank you for your attention .
  • The shoulders of giants pick apples

After reading three things ️

========

If you think this is helpful for you , I'd like to invite you to do me three little favors :

give the thumbs-up , forward , There's your 『 Praise and comment 』, That's what motivates me to create .

Official account 『 Java Doudi 』, Share original knowledge from time to time .

At the same time, we can look forward to the following articles ing

版权声明
本文为[Mr.Z]所创,转载请带上原文链接,感谢

Scroll to Top