2021年6月23日星期三

线程池是如何重复利用空闲线程的?

在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。

使用线程池的好处:

降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

很显然,线程池一个很显著的特征就是"长期驻留了一定数量的活线程",避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个"活线程",自然是不能很快就销毁的。为了搞清楚这个"活线程"是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。

学习过线程池都知道,可以通过工厂类Executors来创个多种类型的线程池,部分类型如下:

public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());}public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0);}

无论哪种类型的线程池,最终都是直接或者间接通过ThreadPoolExecutor这个类来实现的。而ThreadPoolExecutor的有多个构造方法,最终都是调用含有7个参数的构造函数。

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even *  if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the *  pool * @param keepAliveTime when the number of threads is greater than *  the core, this is the maximum time that excess idle threads *  will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are *  executed. This queue will hold only the {@code Runnable} *  tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor *  creates a new thread * @param handler the handler to use when execution is blocked *  because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> *   {@code corePoolSize < 0}<br> *   {@code keepAliveTime < 0}<br> *   {@code maximumPoolSize <= 0}<br> *   {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} *   or {@code threadFactory} or {@code handler} is null */public ThreadPoolExecutor(int corePoolSize,       int maximumPoolSize,       long keepAliveTime,       TimeUnit unit,       BlockingQueue<Runnable> workQueue,       ThreadFactory threadFactory,       RejectedExecutionHandler handler) { if (corePoolSize < 0 ||  maximumPoolSize <= 0 ||  maximumPoolSize < corePoolSize ||  keepAliveTime < 0)  throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null)  throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}

① corePoolSize

顾名思义,其指代核心线程的数量。当提交一个任务到线程池时,线程池会创建一个核心线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建新的核心线程,而等到需要执行的任务数大于线程池核心线程的数量时就不再创建,这里也可以理解为当核心线程的数量等于线程池允许的核心线程最大数量的时候,如果有新任务来,就不会创建新的核心线程。

如果你想要提前创建并启动所有的核心线程,可以调用线程池的prestartAllCoreThreads()方法。

② maximumPoolSize

顾名思义,其指代线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。所以只有队列满了的时候,这个参数才有意义。因此当你使用了无界任务队列的时候,这个参数就没有效果了。

③ keepAliveTime

顾名思义,其指代线程活动保持时间,即当线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,会导致资源浪费。

注意:这里指的是核心线程池以外的线程。还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间。

④ TimeUnit

顾名思义,其指代线程活动保持时间的单位:可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

⑤ workQueue

顾名思义,其指代任务队列:用来保存等待执行任务的阻塞队列。

⑥ threadFactory

顾名思义,其指代创建线程的工厂:可以通过线程工厂给每个创建出来的线程设置更加有意义的名字。

⑦ RejectedExecutionHandler

顾名思义,其指代拒绝执行程序,可以理解为饱和策略:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。

AbortPolicy:直接抛出异常RejectedExecutionException。

CallerRunsPolicy:只用调用者所在线程来运行任务,即由调用 execute方法的线程执行该任务。

DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

DiscardPolicy:不处理,丢弃掉,即丢弃且不抛出异常。

这7个参数共同决定了线程池执行一个任务的策略:

当一个任务被添加进线程池时:

  1. 线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务

  2. 线程数量达到了 corePools,则将任务移入队列等待

  3. 队列已满,新建线程(非核心线程)执行任务

  4. 队列已满,总线程数又达到了 maximumPoolSize,就会由上面那位星期天(RejectedExecutionHandler)抛出异常

说白了就是先利用核心线程,核心线程用完,新来的就加入等待队列,一旦队列满了,那么只能开始非核心线程来执行了。

上面的策略,会在阅读代码的时候体现出来,并且在代码中也能窥探出真正复用空闲线程的实现原理。

接下来我们就从线程池执行任务的入口分析。

一个线程池可以接受任务类型有Runnable和Callable,分别对应了execute和submit方法。目前我们只分析execute的执行过程。

上源码:

public void execute(Runnable command) { if (command == null)  throw new NullPointerException(); /*  * Proceed in 3 steps:  *  * 1. If fewer than corePoolSize threads are running, try to  * start a new thread with the given command as its first  * task. The call to addWorker atomically checks runState and  * workerCount, and so prevents false alarms that would add  * threads when it shouldn't, by returning false.  *  * 2. If a task can be successfully queued, then we still need  * to double-check whether we should have added a thread  * (because existing ones died since last checking) or that  * the pool shut down since entry into this method. So we  * recheck state and if necessary roll back the enqueuing if  * stopped, or start a new thread if there are none.  *  * 3. If we cannot queue task, then we try to add a new  * thread. If it fails, we know we are shut down or saturated  * and so reject the task.  */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //第一步:如果线程数量小于核心线程数  if (addWorker(command, true))//则启动一个核心线程执行任务   return;  c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//第二步:当前线程数量大于等于核心线程数,加入任务队列,成功的话会进行二次检查  int recheck = ctl.get();  if (! isRunning(recheck) && remove(command))   reject(command);  else if (workerCountOf(recheck) == 0)   addWorker(null, false);//启动非核心线程执行,注意这里任务是null,其实里面会去取任务队列里的任务执行 } else if (!addWorker(command, false))//第三步:加入不了队列(即队列满了),尝试启动非核心线程  reject(command);//如果启动不了非核心线程执行,说明到达了最大线程数量的限制,会使用第7个参数抛出异常}

代码并不多,主要分三个步骤,其中有两个静态方法经常被用到,主要用来判断线程池的状态和有效线程数量:

// 获取运行状态private static int runStateOf(int c)  { return c & ~CAPACITY; }// 获取活动线程数private static int workerCountOf(int c) { return c & CAPACITY; }

总结一下,execute的执行逻辑就是:

  • 如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

  • 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

  • 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

从代码中我们也可以看出,即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子,那么我们继续来看看addWorker方法。

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {  int c = ctl.get();  int rs = runStateOf(c);  // Check if queue empty only if necessary.  if (rs >= SHUTDOWN &&   ! (rs == SHUTDOWN &&    firstTask == null &&    ! workQueue.isEmpty()))   return false;  for (;;) {   int wc = workerCountOf(c);   if (wc >= CAPACITY ||    wc >= (core ? corePoolSize : maximumPoolSize))    return false;   if (compareAndIncrementWorkerCount(c))    break retry;   c = ctl.get(); // Re-read ctl   if (runStateOf(c) != rs)    continue retry;   // else CAS failed due to workerCount change; retry inner loop  } } //前面都是线程池状态的判断,暂时不理会,主要看下面两个关键的地方 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {  w = new Worker(firstTask); // 新建一个Worker对象,这个对象包含了待执行的任务,并且新建一个线程  final Thread t = w.thread;  if (t != null) {   final ReentrantLock mainLock = this.mainLock;   mainLock.lock();   try {    // Recheck while holding lock.    // Back out on ThreadFactory failure or if    // shut down before lock acquired.    int rs = runStateOf(ctl.get());    if (rs < SHUTDOWN ||     (rs == SHUTDOWN && firstTask == null)) {     if (t.isAlive()) // precheck that t is startable      throw new IllegalThreadStateException();     workers.add(w);     int s = workers.size();     if (s > largestPoolSize)      largestPoolSize = s;     workerAdded = true;    }   } finally {    mainLock.unlock();   }   if (workerAdded) {    t.start(); // 启动刚创建的worker对象里面的thread执行    workerStarted = true;   }  } } finally {  if (! workerStarted)   addWorkerFailed(w); } return workerStarted;}

方法虽然有点长,但是我们只考虑两个关键的地方,先是创建一个worker对象,创建成功后,对线程池状态判断成功后,就去执行该worker对象的thread的启动。也就是说在这个方法里面启动了一个关联到worker的线程,但是这个线程是如何执行我们传进来的runnable任务的呢?接下来看看这个Worker对象到底做了什么。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /**  * This class will never be serialized, but we provide a  * serialVersionUID to suppress a javac warning.  */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /**  * Creates with given first task and thread from ThreadFactory.  * @param firstTask the first task (null if none)  */ Worker(Runnable firstTask) {  setState(-1); // inhibit interrupts until runWorker  this.firstTask = firstTask;  this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() {  runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() {  return getState() != 0; } protected boolean tryAcquire(int unused) {  if (compareAndSetState(0, 1)) {   setExclusiveOwnerThread(Thread.currentThread());   return true;  }  return false; } protected boolean tryRelease(int unused) {  setExclusiveOwnerThread(null);  setState(0);  return true; } public void lock()  { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock()  { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {  Thread t;  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {   try {    t.interrupt();   } catch (SecurityException ignore) {   }  } }}

最重要的构造方法:

Worker(Runnable firstTask) { // worker本身实现了Runnable接口  setState(-1); // inhibit interrupts until runWorker  this.firstTask = firstTask; // 持有外部传进来的runnable任务  //创建了一个thread对象,并把自身这个runnable对象给了thread,一旦该thread执行start方法,就会执行worker的run方法  this.thread = getThreadFactory().newThread(this);  }在addWorker方法中执行的t.start会去执行worker的run方法:public void run() {  runWorker(this); }run方法又执行了ThreadPoolExecutor的runWorker方法,把当前worker对象传入。final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // 取出worker的runnable任务 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {  // 循环不断的判断任务是否为空,当第一个判断为false的时候,即task为null,这个task啥时候为null呢?  // 要么w.firstTask为null,还记得我们在execute方法第二步的时候,执行addWorker的时候传进来的runnable是null吗?  // 要么是执行了一遍while循环,在下面的finally中执行了task=null;  // 或者执行第二个判断,一旦不为空就会继续执行循环里的代码。  while (task != null || (task = getTask()) != null) {   w.lock();   // If pool is stopping, ensure thread is interrupted;   // if not, ensure thread is not interrupted. This   // requires a recheck in second case to deal with   // shutdownNow race while clearing interrupt   if ((runStateAtLeast(ctl.get(), STOP) ||     (Thread.interrupted() &&     runStateAtLeast(ctl.get(), STOP))) &&    !wt.isInterrupted())    wt.interrupt();   try {    beforeExecute(wt, task);    Throwable thrown = null;    try {     task.run(); // 任务不为空,就会执行任务的run方法,也就是runnable的run方法    } catch (RuntimeException x) {     thrown = x; throw x;    } catch (Error x) {     thrown = x; throw x;    } catch (Throwable x) {     thrown = x; throw new Error(x);    } finally {     afterExecute(task, thrown);    }   } finally {    task = null; // 执行完成置null,继续下一个循环    w.completedTasks++;    w.unlock();   }  }  completedAbruptly = false; } finally {  processWorkerExit(w, completedAbruptly); }}

方法比较长,归纳起来就三步:

1,从worker中取出runnable(这个对象有可能是null,见注释中的解释);

2,进入while循环判断,判断当前worker中的runnable,或者通过getTask得到的runnable是否为空,不为空的情况下,就执行run;

......

原文转载:http://www.shaoqun.com/a/825657.html

跨境电商:https://www.ikjzd.com/

agora:https://www.ikjzd.com/w/2176

贝贝官网:https://www.ikjzd.com/w/1321

汇通天下:https://www.ikjzd.com/w/2055


在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。使用线程池的好处:降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n
亚马逊海外视频营销课程:https://www.ikjzd.com/tl/14982
怒放的石门樱花 来自春天的邀请函:http://www.30bags.com/a/416827.html
口述:我和两位小姨子们的难忘激情性事(7/7):http://lady.shaoqun.com/a/64585.html
深圳高北十六需要核酸检测阴性报告吗:http://www.30bags.com/a/453905.html
阿里国际站运营如何打造实力优品:https://www.ikjzd.com/articles/145981
吉林省"精彩夜吉林"线下演出活动成功举办:http://www.30bags.com/a/434955.html
斑马物流:https://www.ikjzd.com/w/1316
首发!深诺集团发布2018年全球电商行业"最全蓝皮书":https://www.ikjzd.com/tl/15006
重磅!重磅!站在计算机角度删亚马逊review差评-10年程序猿老司机分享:https://www.ikjzd.com/tl/15018
我怀孕了 男友说生不生跟他没关系:http://lady.shaoqun.com/m/a/272124.html
狗狗 紫黑的粗硕用力挺入撕 口述被大公狗塞的满满的:http://lady.shaoqun.com/m/a/246905.html
深圳7月音乐会汇总(持续更新):http://www.30bags.com/a/453904.html

没有评论:

发表评论