编程知识 cdmana.com

How Tomcat uses thread pool to process remote concurrent requests

Tomcat How to use thread pool to process remote concurrent requests

Learning through understanding tomcat How to handle concurrent requests , Learn about thread pools , lock , queue ,unsafe class , The main code below comes from

java-jre:
sun.misc.Unsafe
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.locks.AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedLongSynchronizer
java.util.concurrent.LinkedBlockingQueue

tomcat:
org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue

ThreadPoolExecutor

Is a thread pool implementation class , Manage threads , Reduce thread overhead , It can be used to improve the efficiency of task execution ,

The parameters in the construction method are

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    
}

corePoolSize It's the number of core threads
maximumPoolSize Is the maximum number of threads
keepAliveTime Non core thread maximum idle time ( It's over time )
unit Time unit
workQueue queue , When there are too many tasks , Put it in the queue first
threadFactory Thread factory , A factory for creating threads
handler No strategy , When there are too many tasks , When the queue can no longer hold tasks , How to deal with , To deal with this object . It's an interface , You can customize the processing

ThreadPoolExecutor stay Tomcat in http Application requested

This thread pool is tomcat After receiving a remote request , Treat each request as a separate task , Every time you call execute(unnable)

initialization

org.apache.tomcat.util.net.NioEndpoint

NioEndpoint When initializing , A thread pool is created

public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        //TaskQueue Unbounded queue , You can always add , therefore handler  Equivalent to invalid 
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

When the thread pool is created , call prestartAllCoreThreads(), Initialize the core worker thread worker, And start the

public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

When addWorker The quantity is equal to corePoolSize when ,addWorker(null,ture) Returns the false, stop it worker The creation of worker threads

Submit task to queue

Every time the client comes to ask for (http), It will submit a processing task ,

worker Get the task run from the queue , The following is the logical code for the task to be put into the queue

ThreadPoolExecutor.execute(Runnable) Submit tasks :

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
    	// worker Count   Is less than   Number of core threads    tomcat After initialization , Generally, the first condition is not satisfied , Can't addWorker
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    	// workQueue.offer(command), Add tasks to the queue ,
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

workQueue.offer(command) Completed the task submission ( stay tomcat Processing remote http When asked ).

workQueue.offer

TaskQueue yes BlockingQueue Concrete implementation class ,workQueue.offer(command) The actual code :

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node); // Add task to queue here 
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

//  Add tasks to the queue 
/**
     * Links node at end of queue.
     *
     * @param node the node
     */
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node; // Chain table structure  last.next = node; last = node
}

And then worker The job of ,worker stay run In the method, by going to getTask() Get the task submitted here , And perform the task .

How the thread pool handles newly submitted tasks

add to worker after , Submit tasks , because worker The number of corePoolSize, Tasks will be queued , and worker Of run The method is to loop through the tasks in the queue ( Isn't empty ),

worker run Method :

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
 }

Loop to get the task in the queue

runWorker(worker) Method Loop part code :

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { // Loop to get the task in the queue 
                w.lock(); //  locked 
                try {
                    //  Pre run processing 
                    beforeExecute(wt, task);
                    //  The task in the queue begins to execute 
                    task.run();
                    //  Post run processing 
                    afterExecute(task, thrown);
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock(); //  Release the lock 
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

task.run() Perform tasks

Lock application

ThreadPoolExecutor Using locks guarantees two things ,
1. Add tasks to the queue , Ensure that other threads cannot operate on the queue
2. Get the task of the queue , Ensure that other threads cannot operate on the queue at the same time

Add tasks to the queue

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();  // locked 
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();  // Release the lock 
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

Get Queue task lock

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
		// ... Omit 
        for (;;) {
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take(); // Get a task in the queue 
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); //  locked 
        try {
            while (count.get() == 0) {
                notEmpty.await(); // If there are no tasks in the queue , wait for 
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock(); //  Release the lock 
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

volatile

In concurrent scenarios, it's common to use this keyword to modify member variables ,

Main purpose: when a common variable is modified by a thread , Visible to other threads ( real time )

In the pool , There are ordinary uses Unsafe class , This class is in high concurrency , Can make some atoms CAS operation , Lock thread , Release threads, etc .

sun.misc.Unsafe Class is the underlying class ,openjdk Source code has

Atomic manipulation data

java.util.concurrent.locks.AbstractQueuedSynchronizer Class has code that guarantees atomic operations

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

Corresponding Unsafe Class code :

// Corresponding java Bottom , the truth is that native Method , Corresponding C++ Code 
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

The function of the method is simply Update a value , Guarantee atomic operation
When you want to manipulate an object o A member variable of offset when , modify o.offset,
To ensure accuracy , You're operating o.offset When , Reading should be the correct value , And it can't be modified by other threads to ensure the high concurrency environment data operation is effective .

namely expected The expected value is the same as the value in memory expected == Value in memory , Then the update value is x, return true It means that the modification was successful

otherwise , The expected value is different from the memory value , Indicates that the value has been modified by another thread , Cannot update value to x, And back to false, Tell the operator that the atomic modification failed .

Blocking and waking threads

public native void park(boolean isAbsolute, long time); // Block the current thread

Thread pool worker Roles loop to get queue tasks , If there are no tasks in the queue ,worker.run Still waiting , Does not exit the thread , Used in the code notEmpty.await() Interrupt this worker Threads , Put in a queue of waiting threads ( Different to the task queue ); When new tasks are needed , Again notEmpty.signal() Wake up this thread

The bottom layers are
unsafe.park() Block the current thread
public native void park(boolean isAbsolute, long time);

unsafe.unpark() Wake up the thread
public native void unpark(Object thread);

This operation corresponds to , Blocking time , First the thread Put in queue , Wake up , Take the blocked thread out of the queue ,unsafe.unpark(thread) Wake up the specified thread .

java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject Class

The thread information is stored in the linked list

//  Add a blocking thread 
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node; // Put the new blocked thread at the end of the list 
            return node;
        }

//  Take out a blocked thread 
 public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter; // The first blocked thread in the linked list 
            if (first != null)
                doSignal(first);
        }

//  When you get it , Wake up this thread 
final boolean transferForSignal(Node node) {
            LockSupport.unpark(node.thread);
        return true;
    }
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

版权声明
本文为[Wu Nanyu]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224220340703j.html

Scroll to Top