网络知识 娱乐 Juc并发编程13——如何实现一个线程池?(万字源码剖析)

Juc并发编程13——如何实现一个线程池?(万字源码剖析)

前 言
🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端
🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。
🌰 文章简介:本文主要介绍线程池的实现原理,注释十分详细,并且对于值的学习的代码做了点评
🍓 相关推荐:Juc并发编程12——2万字深入源码:线程池这篇真的讲解的透透的了

前面我们已经介绍过线程池的使用了,下面我们来深挖它的实现原理,其原理比较复杂,准备好,发车。

先介绍下ctl变量。点进ThreadPoolExecutor的源码,就可以看到它,一起来看。(为了方便理解,我会对源码进行一定的删减或者顺序的调整)

public class ThreadPoolExecutor extends AbstractExecutorService {
	// 使用原子类型数据,保证原子性,
	//通过拆分32个比特位保存数据,ctl的前3位用于保存状态,后29位保存工作线程数量
	//(思考:如果工作线程数大于2^29-1,装不下是不是就GG了?)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     private static final int COUNT_BITS = Integer.SIZE - 3; //29位,线程数量位
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 容量:2^29 - 1
    
    // 所有运行状态,只用到前三位,不会占用后29位
    // 接收新任务,并执行等待队列中的其他任务
     private static final int RUNNING    = -1 << COUNT_BITS; //  111 000...
    // 不接收新任务,但执行等待队列中的任务
    private static final int SHUTDOWN = 0 << COUNT_BITS ; // 000 000...
    // 不接收新任务,且不执行等待队列中的任务,并且中断正在执行的任务
    private static final int STOP = 1 << COUNT_BITS; // 001 000...
    // 所有任务已经结束,线程数为0,即将完全关闭
    private static final int TIDYING =  2 << COUNT_BITS; // 010 000...
    // 完全关闭
    private static final int TERMINATED = 3 << COUNT_BITS; //011 000 ...

	// 取前三位运行状态
	 private static int runStateOf(int c) {
        return c & ~CAPACITY;  //即c & 111 000....(29个0)
    }

	// 获取工作线程数
    private static int workerCountOf(int c) {
        return c & CAPACITY;  // c & COUNT_MASK 
    }

	// 将运行状态与工作线程数作为一个整体打包成ctl
	 private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }

}

ctl其实就是下图这样。上面源码还是可以在实际中应用的,可以好好体会这种打包使用一个数存储状态与线程数目两个值的方法。
在这里插入图片描述
看完了ctl变量,来看看execute方法。

// 阻塞队列,唤醒类型是Runnable
 private final BlockingQueue<Runnable> workQueue;

// 注意:这里没有加锁,这也是为什么ctl需要使用原子类来保存
 public void execute(Runnable command) {
        if (command == null) {  // 如果提交的任务为null
            throw new NullPointerException();//直接抛出空指针异常
        } else {
            int c = this.ctl.get(); // 获取ctl,后面我们根据ctl来读取信息
            if (workerCountOf(c) < this.corePoolSize) { // 判断工作线程数是否小于核心线程数
                if (this.addWorker(command, true)) { // 直接添加新的线程,第二个参数代表是否为核心线程
                    return;
                }

				// 如果线程添加失败,可能是因为其它线程在操作ctl
                c = this.ctl.get();  //更新ctl,没有return,接着往后走了
            }

			 // 如果当前线程池是running状态,尝试将任务加入阻塞队列
            if (isRunning(c) && this.workQueue.offer(command)) {
                int recheck = this.ctl.get(); // 再次获取ctl
                // 如果线程池已经关闭了,把它移除出阻塞队列
                if (!isRunning(recheck) && this.remove(command)) {
                    this.reject(command); // 执行拒绝策略
                }
                // 如果当前线程池中没有线程,直接在当前线程池中添加线程
                 else if (workerCountOf(recheck) == 0) {
                    //赶紧添加一个非核心线程,避免当前任务永远不会执行的情况出现,但是注意没有添加任务 
                    this.addWorker((Runnable)null, false); 
                }
                // 其它情况说明一切正常,啥也不用再做了
            } 

			// 走到这核心线程数肯定已经满了
			// 而且要么是线程池没有运行,要么是加入阻塞队列失败
			else if (!this.addWorker(command, false)) { // 创建非核心线程
                this.reject(command); // 执行拒绝策略
            }

        }
    }

是不是感觉很清晰呀,甚至感觉学习下自己也可以写出来,认真品味,可以学习借鉴的的地方挺多的。

接下来来看看addworker是怎么来创建和执行任务的。

  private boolean addWorker(Runnable firstTask, boolean core) {
  		 // 标签,方便后续退出外层循环
        retry:
        for (;;) { // 无限循环,注意这里全程没有加锁
            int c = ctl.get(); 
            int rs = runStateOf(c); //解析当前的状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && //是否处于运行状态
            		// 除了shutdown状态,且无正在执行的任务,且等待队列有任务的情况,都直接返回false,
            		//也就是只允许非运行状态中,shutdown状态添加等待队列中的最后一个任务
                ! (rs == SHUTDOWN && // 如果没有运行,判断是否处于shutdown状态
                   firstTask == null && //没有正在执行的任务
                   ! workQueue.isEmpty())) //等待队列不为空
                return false;

			// 内层for无限循环,将线程计数增加,然后才能真正的添加一个线程
			//(如果计数都失败,无法保证添加新线程是线程安全的)
            for (;;) {
                int wc = workerCountOf(c); 
                if (wc >= CAPACITY || //如果工作线程数已经超过了容量限制2^29-1
                 //或者超过线程池指定的大小
                 //(核心线程超过核心线程数||非核心线程超过最大线程数)
                    wc >= (core ? corePoolSize : maximumPoolSize))    
                    return false;
                // 排除临界情况后,通过CAS操作增加线程数,
                //CAS成功则直接跳出外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS失败,则说明其它线程在自增或者ctl发生了变化
                c = ctl.get();  //更新ctl到最新状态
                if (runStateOf(c) != rs) //如果状态与初始读到的不一致了
                    continue retry; //从头再来(因为前面的状态的比较可能不对)
                //其它CAS失败的原因只可能是其它线程也在自增,
                //内层循环重来就可以
            }
        }

        boolean workerStarted = false; //工作线程是否已启动
        boolean workerAdded = false; //工作线程是否已添加
        Worker w = null; // 工作线程类
        try {
            w = new Worker(firstTask); //创建新的工作线程对象
            final Thread t = w.thread; 
            if (t != null) { 
                // 加锁,每次只能有一个线程进入了
                final ReentrantLock mainLock = this.mainLock; 
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get()); //获取线程状态

                    if (rs < SHUTDOWN || // 是不是运行
                    		// shutdown状态且任务不为空
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 如果线程还alive,说明是异常状态
                            throw new IllegalThreadStateException();
                        workers.add(w); //把worker丢到线程池的workers集合中
                        int s = workers.size(); //获取当前线程池的大小
                        if (s > largestPoolSize) //更新线程池的历史最大大小
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { //添加成功
                    t.start(); // 启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) // 没有成功启动,处理异常
            //(会将线程从集合取出来,计数器减1,加速终止线程)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

🌹 上面添加线程计数对于不能正常添加线程的临界情况的处理保证了代码的健壮性与优雅性,很值得学习,总结下临界处理逻辑:
线程状态不对->超过容量限制->CAS增加线程计数成功 ->ctl状态被其它线程更改->其它线程在自增

我们注意到上面是使用Worker对象作为线程的封装,接下来就分析下这个Worker吧。

  private final class Worker //继承了AQS
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        private static final long serialVersionUID = 6138294804551838833L;
	   // 真正的线程
        final Thread thread;
        // 要执行的第一个任务,在构造对象时确定  
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
           // 中断标志置-1
           // 在执行runWorker方法运行线程前禁止中断
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //并没有直接执行firstTasker,而是调用runWorker
        // 试想,如果直接执行,执行结束线程就没有了,
        // 线程池又如何复用呢?
        public void run() {
            runWorker(this);
        }

        // 判断是否加了排它锁
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        ...
}

接下来重点看看runWorker方法,分析下worker是怎么调度执行任务的吧。

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread(); // 获取当前线程
        Runnable task = w.firstTask; //取出要执行的任务
        w.firstTask = null; 
        // 允许 interrupts
        //(前面setState将中断允许标志设置成了-1
        // 这里unlock就是将其中断允许标志设置为0)
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
        // 待执行任务不为空或者等待队列中有非空任务
        // 注意这里其实是无限循环
        //因为getTask()方法从阻塞队列中取任务其实是阻塞式的
        //这也是线程池复用线程的奥秘
            while (task != null || (task = getTask()) != null) {
              // 对当前线程加锁
              // 注意这里不是为了防其它线程
              //而是为了在shutdown时保护此线程的运行(后文解释)
                w.lock(); 
                //如果线程池已经停止(及以上状态)
                // 或者当前线程被打上了中断标记
                //需要中断正在运行的工作线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() && 
                      runStateAtLeast(ctl.get(), STOP))) &&
                    //确保工作线程之前没有打过中断标记
                    !wt.isInterrupted()) 
                    // 打中断标记
                    wt.interrupt();
                try {
                	//开始之前的准备工作,这里暂时没有实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); //运行线程
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行后的操作,也没有实现
                        afterExecute(task, thrown);
                    }
                } finally {
                // 任务完毕,置空,方便下一轮直接从阻塞队列中取新的任务
                    task = null; 
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        // 走到这,说明worker已经被丢弃了
        // (中断:线程池关闭|非核心线程超时)
            processWorkerExit(w, completedAbruptly);
        }
    }

再来看看上面的getTask方法,看看线程池时如何从阻塞队列中取任务的。

    // 返回参数类型是Runnable
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 无需再执行等待队列中任务的情况,返回null,
            //对应上面runWorker代码的while循环的跳出条件
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); //减少工作线程的数量
                return null;
            }
			// 取出当前线程池的线程数量
            int wc = workerCountOf(c);

            // 允许超时或者工作线程数大于核心线程数,
            //timed就标示为可超时
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
					// 如果超时或者工作线程数大于最大线程数
					//(maximumPoolSize在运行期间被修改了:安全机制,因为全程未加锁),
					// 并且线程数大于1或者等待队列为空
					//(如果线程数<1,而且等待队列中还有线程,
					// 那么无论如何还是要取一个线程出来,
					//超时和安全机制可以暂且不管,
					// 否则线程池就没有工作线程了)
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 如果CAS减少工作队列成功,返回null
                // 工作线程中获取不到任务了,
                //上面runWorker代码同样会跳出工while
                if (compareAndDecrementWorkerCount(c)) 
                    return null;
                // CAS失败,说明工作线程数被其它线程修改,
                //进入下一轮循环判断
                continue;
            }

            try {
               // 真正的出队操作,前面都是安全性的检查
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //走到这说明r==null,也就是超时了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

上面的注释应该很详细了,大家看不懂可以多看几遍,体会代码的完备性,在工作中如果有设备管理、自动化脚本执行管理场景等可以模仿优秀源码。

execute方法讲到这里,接下来看看shutdown方法

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
           // 判断是否有权限终止
            checkShutdownAccess();
            // CAS将线程池状态修改为SHUTDOWN状态,后文详解
            advanceRunState(SHUTDOWN);
            // 中断空闲的worker,但不会影响正在运行的线程,后文详解
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

看看上文的advanceRunState,使用CAS将线程池状态修改为SHUTDOWN状态。

 private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 判断c是否大于等于目标的状态
            if (runStateAtLeast(c, targetState) ||
            		// CAS将当前状态替换为目标状态
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

是不是还挺简单的。再来看看interruptIdleWorkers,怎么中断空闲的worker,但不影响正在运行的线程。

 private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
  }
 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        // 注意shutdown已经加过一把锁,这里加的是第二把锁
        mainLock.lock();
        try {
        		// 遍历workers
            for (Worker w : workers) {
                Thread t = w.thread;
                // 判断线程是否被中断,尝试进行加锁
                // 注意,在我们前面讲worker源码时,进行了加锁
                // 因此正在运行的任务可以得到保护,继续执行
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                       // 通知中断
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                       // 解锁
                        w.unlock();
                    }
                }
                // 如果只针对一个worker,则结束循环
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock()