编程知识 cdmana.com

How Tomcat processes remote concurrent requests using thread pool

# Tomcat Learn how to use concurrency pool to process requests tomcat How to handle concurrent requests , Learn about thread pool , Lock , Waiting for ,unsafe Class , The main code below comes from java-jre:`sun.misc.Unsafe``java.util.concurrent.ThreadPoolExecutor``java.util.concurrent.ThreadPoolExecutor.Worker``java.util.concurrent.locks.AbstractQueuedSynchronizer``java.util.concurrent.locks.AbstractQueuedLongSynchronizer``java.util.concurrent.LinkedBlockingQueue`tomcat:`org.apache.tomcat.util.net.NioEndpoint``org.apache.tomcat.util.threads.ThreadPoolExecutor``org.apache.tomcat.util.threads.TaskThreadFactory``org.apache.tomcat.util.threads.TaskQueue`## ThreadPoolExecutor Is a thread pool implementation class , Manage threads , Reduce thread overhead , It can be used to improve the efficiency of task execution , The arguments in the construction method are ```javapublic ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }```corePoolSize It's the number of core threads maximumPoolSize Is the maximum number of threads keepAliveTime Maximum idle time for non core threads ( It's over time )unit Time unit workQueue Waiting for , When there are too many tasks , First put it in the queue threadFactory Thread factory , The factory that builds threads handler No strategy , When there are too many tasks , When the queue can no longer hold tasks , How to deal with , This object deals with . This is an interface , You can customize the process ## ThreadPoolExecutor stay Tomcat in http The requested application pool is tomcat After receiving a remote request , Treat each request as a separate task , Every call execute(Runnable)### initialization `org.apache.tomcat.util.net.NioEndpoint`NioEndpoint When initializing , Set up a thread pool ```javapublic void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); //TaskQueue Boundless queue , You can always add , therefore handler It is equivalent to invalid TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }``` When the online pool is established , call prestartAllCoreThreads(), Initializing the core work thread worker, And start ```javapublic int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }``` When addWorker The quantity is equal to corePoolSize When ,addWorker(null,ture) Will return to false, stop it worker The establishment of work thread ### Submit a task to the queue every time the client requests (http), It will submit a processing task ,worker Get task execution from queue , Here is the logical code for the task to be queued ThreadPoolExecutor.execute(Runnable) Submit task :```javapublic void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // worker Number Is it less than Number of core threads tomcat After initialization in , Generally, the first condition is not satisfied , No addWorker if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // workQueue.offer(command), Add task to queue , if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }```workQueue.offer(command) Completed the task submission ( stay tomcat Dealing with remote http On request ).#### workQueue.offerTaskQueue yes BlockingQueue Concrete implementation class ,workQueue.offer(command) The actual code :```javapublic boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node (e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); // Add task to queue here c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0;}// Add task to queue /** * Links node at end of queue. * * @param node the node */private void enqueue(Node node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // Concatenation structure last.next = node; last = node}``` After that worker The job of ,worker stay run In the method, go to getTask() Get tasks submitted here , And carry out the task .### How to handle the new task pool worker After that , Submit task , Because worker The quantity reaches corePoolSize, Tasks will be queued , and worker Of run Method is to loop back to get the task in the queue ( Not empty ),worker run Method :```java/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }```#### Loop back to get the task in the queue runWorker(worker) Method Loop part of the code :```javafinal void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // Loop back to get the task in the queue w.lock(); // Lock up try { // Perform preprocessing beforeExecute(wt, task); // The task in the queue starts executing task.run(); // Post execution processing afterExecute(task, thrown); } finally { task = null; w.completedTasks++; w.unlock(); // Release lock } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }```task.run() Perform task ### Lock application ThreadPoolExecutor Using locks guarantees two things ,1. Add a task to the queue , Ensure that other threads cannot operate the queue 2. Get queued tasks , Ensure that other threads cannot operate the queue at the same time #### Lock the queue for new tasks ```javapublic boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node (e); final ReentrantLock putLock = this.putLock; putLock.lock(); // Lock up try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); // Release lock } if (c == 0) signalNotEmpty(); return c >= 0; }```### Get queued tasks ```javaprivate Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // ... Omit for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // Get a task in the queue if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // Lock up try { while (count.get() == 0) { notEmpty.await(); // If there are no tasks in the queue , wait for } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); // Release lock } if (c == capacity) signalNotFull(); return x; }```### volatile In concurrency scenarios, it's very common for this keyword to modify member variables , The main purpose public variable is modified by a thread , This is visible to other threads ( Real time )## sun.misc.Unsafe High concurrency related class thread pool in use , There are ordinary uses Unsafe Class , This class is in high concurrency , Can make some atoms CAS operation , Lock thread , Release threads, etc .`sun.misc.Unsafe` Class is the underlying class ,openjdk There's... In the source code ### Atomic manipulation data java.util.concurrent.locks.AbstractQueuedSynchronizer Class contains code that guarantees atomic operations ```javaprotected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }``` Correspondence Unsafe Class :```java// Corresponding java Bottom floor , The reality is native Method , Correspondence C++ Code /*** Atomically update Java variable to x if it is currently* holding expected.* @return true if successful*/public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);``` The function of the method is simply Update a value , Ensure atomicity when you want to manipulate an object `o` A member variable of `offset` When , modify o.offset, High and low to ensure accuracy , You're operating o.offset When , Reading should be the correct value , In addition, it can not be modified by other threads to ensure the efficient operation of high concurrency environment data . namely expected The expected value is the same as the value in memory expected == Values in memory , The update value is x, return true It means the modification is successful, otherwise , The expected value is different from the memory value , The description value has been modified by another thread , Cannot update value to x, And return false, Tell the operator that the atomic modification failed .### Blocking and waking threads public native void park(boolean isAbsolute, long time); // Blocking the current thread pool worker The role loops to get queued tasks , If there are no tasks in the queue ,worker.run Still waiting , Does not exit the thread , The code uses `notEmpty.await() ` Interrupt this worker Thread , Put a waiting thread queue ( Different to the task queue ); When new tasks are needed , Again `notEmpty.signal()` To wake up this thread, the bottom layer is unsafe.park() Block the current thread public native void park(boolean isAbsolute, long time); unsafe.unpark() Wake up the thread public native void unpark(Object thread); This operation corresponds to , When blocked , First will thread Put in the queue , When you wake up , Take a blocked thread from the queue ,unsafe.unpark(thread) Wakes the specified thread . `java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject` Class to store thread information through link string ```java// Add a new blocking thread private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; // Put the newly blocked thread at the end of the link string return node; }// Take out a blocked thread public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // Link the first blocked thread in the string if (first != null) doSignal(first); }// When you get it , Wake up this thread final boolean transferForSignal(Node node) { LockSupport.unpark(node.thread); return true; }public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread);

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201225110238126H.html

Scroll to Top