浅浅的写一个线程池
创始人
2024-06-03 13:02:15
0

目录

  • 一、线程池(ThreadPoolExecutor)
    • 1)构造方法
    • 2)工作方式
    • 3)线程池的状态(自定义并不是这样实现)
  • 自定义线程池
    • 1.流程
    • 2.execute执行流程
  • 代码
    • 线程工厂
    • 拒绝策略
    • ThreadPool线程池
    • 测试

一、线程池(ThreadPoolExecutor)

1)构造方法

public ThreadPoolExecutor(  int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

2)工作方式

在这里插入图片描述

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务,阻塞等待执行任务队列的任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  • 如果任务队列选择了有界队列,那么任务超过了队列大小时,会创建maximumPoolSize - corePoolSize 数目的线程来救急,称为救急线程,救急线程在keepAliveTime时间后没有任务执行就进入等待,等待任务任务队列满了才唤醒
  • 如果线程到达 maximumPoolSize 且救急线程也都在执行任务,仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务和最新的任务争抢进入队列的机会
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

jdk提供的拒绝策略
当等待队列满了,并且所有线程都在使用时,采取拒绝策略
在这里插入图片描述

3)线程池的状态(自定义并不是这样实现)

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名高 3 位接收新任 务处理阻塞队列任 务说明
RUNNING111YY
SHUTDOWN000NY不会接收新任务,但会处理阻塞队列剩余 任务
STOP001NN会中断正在执行的任务,并抛弃阻塞队列 任务
TIDYING010--任务全执行完毕,活动线程为 0 即将进入 终结
TERMINATED011--终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作
进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

自定义线程池

1.流程

在这里插入图片描述

2.execute执行流程

  1. 线程池状态已经为shutdown,不向队列添加任务,结束
  2. 如果当前的线程数小于核心线程数就增加核心线程,并执行任务,执行完就等待任务队列进入的新任务
  3. 如果任务队列的容量大于0,即是还可以存放任务,直接进入队列(无界队列容量2^31,所以无界的任务队列基本不会执行下面的4、5、6步)
  4. 创建救急线程(救急线程阻塞keepAliveTime后进入等待区)
  5. 唤醒一个救急线程执行任务
  6. 执行拒绝策略

代码

在这里插入图片描述

线程工厂

1.线程工厂接口

@FunctionalInterface
public interface LzkThreadFactory {/***  返回创建的线程* @param task* @return*/Thread newThread(Runnable task);
}
  1. 默认线程工厂
    (ThreadPool静态内部类,这部分不用复制,ThreadPool类内有实现)
//(ThreadPool静态内部类,这部分不用复制,ThreadPool类内有实现)
public static class LzkDefaultThreadFactory implements LzkThreadFactory {private AtomicInteger i=new AtomicInteger(0);@Overridepublic Thread newThread(Runnable task) {return new Thread(task,"[lzkThreadPool-"+i.incrementAndGet()+"] ");}
}

拒绝策略

  1. 拒绝策略接口
public interface RejectedHandler {void rejectedExecution(Runnable r, ThreadPool executor);
}
  1. 拒绝策略四种实现
    (ThreadPool静态内部类,这部分不用复制,ThreadPool类内有实现)
//(ThreadPool静态内部类,这部分不用复制,ThreadPool类内有实现)
/*** 抛异常*/
public static class AbortPolicy implements RejectedHandler {public AbortPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {throw new RejectedExecutionException(executor.toString()+"中任务队列已满,任务【" + r.toString() +"】无法执行!");}
}/*** 调用者执行线程*/
public static class CallerRunsPolicy implements RejectedHandler {public CallerRunsPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {r.run();}
}
/*** 丢弃任务队列的旧任务,将任务和新任务进行竞争*/
public static class DiscardOldestPolicy implements RejectedHandler {public DiscardOldestPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {//是否停止向线程池添加任务if(!executor.getIsShutDown().get()){//丢弃任务队列的就任务executor.getWorkQueue().poll();//添加任务和新任务进行竞争executor.execute(r);}}
}/*** 直接丢弃任务,不抛异常*/
public static class DiscardPolicy implements RejectedHandler {public DiscardPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {}
}

ThreadPool线程池

public class ThreadPool {// 核心线程数private int corePoolSize;// 最大线程数private int maximumPoolSize;// 救急线程无使用时的最长时间private long keepAliveTime;// 时间单位private TimeUnit unit;// 任务队列private BlockingQueue workQueue;// 线程工厂private LzkThreadFactory threadFactory;// 拒绝策列private RejectedHandler handler;//当前线程数,和源代码不同,这里是只记录线程数量private AtomicInteger ctl=new AtomicInteger(0);//活跃的救急线程private AtomicInteger other=new AtomicInteger(0);//线程组,只有获取锁才能操作private HashSet threadSets=new HashSet();//是否停止向任务队列添加任务private AtomicBoolean isShutDown=new AtomicBoolean(false);//救急线程等待区private ReentrantLock lock=new ReentrantLock();private Condition waitSets=lock.newCondition();//get&setpublic BlockingQueue getWorkQueue() {return workQueue;}public AtomicBoolean getIsShutDown() {return isShutDown;}//构造方法public ThreadPool(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue) {this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.workQueue = workQueue;//将线程工厂、拒绝策略选择默认的构造方法this.threadFactory =new LzkDefaultThreadFactory();this.handler = new AbortPolicy();}public ThreadPool(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,RejectedHandler handler) {this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.workQueue = workQueue;this.threadFactory = new LzkDefaultThreadFactory();this.handler = handler;}public ThreadPool(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,LzkThreadFactory threadFactory) {this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.workQueue = workQueue;this.threadFactory = new LzkDefaultThreadFactory();this.handler = new AbortPolicy();}public ThreadPool(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,LzkThreadFactory threadFactory,RejectedHandler handler) {this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.workQueue = workQueue;this.threadFactory = threadFactory;this.handler = handler;}//-------------------任务对象-------------------/*** 线程的任务,先执行第一个任务,之后一直尝试获取任务队列的任务执行*/class Worker implements Runnable {//创建的线程执行的第一个任务private final Runnable firstTask;private long timeOut;private TimeUnit timeUnit;public Worker(Runnable firstTask) {this.firstTask = firstTask;this.timeOut = -1L;}public Worker(Runnable firstTask, long timeOut, TimeUnit timeUnit) {this.firstTask = firstTask;this.timeOut = timeOut;this.timeUnit = timeUnit;}@Overridepublic void run(){//第一个任务Runnable task=firstTask;//阻塞获取任务队列的任务while(true){try{//是否被打断if (Thread.currentThread().isInterrupted()){ctl.decrementAndGet();threadSets.remove(this);return;}if(!Thread.currentThread().isInterrupted()&&(task!=null||(task=timeOut>=0?workQueue.poll(timeOut,timeUnit):workQueue.take())!=null)){task.run();}else {if(isShutDown.get()){//线程池已结束,结束当前救急线程return;}lock.lock();//救急线程阻塞timeOut时间获取不到任务就进入等待区try{waitSets.await();}finally {lock.unlock();}other.decrementAndGet();}//如果线程池状态为结束并且无待执行任务,则结束线程if (isShutDown.get()&&workQueue.size()==0){ctl.decrementAndGet();threadSets.remove(this);return;}}catch (InterruptedException ie) {//非法打断,重新打断Thread.currentThread().interrupt();}catch (Exception e){e.printStackTrace();}finally {task=null;}}}}/*** 提交任务* @param task*/public void execute(Runnable task){//若为空不执行if(task==null){return;}if(isShutDown.get()){//线程池状态已经为shutdown,不向队列添加任务return;}else if(ctl.get()//1、如果当前的线程数小于核心线程数就增加核心线程,并执行任务,执行完就等待任务队列进入的新任务Thread thread = threadFactory.newThread(new Worker(task));synchronized (threadSets){threadSets.add(thread);}thread.start();ctl.incrementAndGet();}else if(workQueue.remainingCapacity()>0){//2、如果任务队列的容量大于0,即是还可以存放任务,无需拒绝,直接进入队列(无界队列容量2^31)workQueue.add(task);}else if(workQueue.remainingCapacity()==0&&ctl.get()//3、创建救急线程(救急线程阻塞keepAliveTime后进入等待区)Thread thread = threadFactory.newThread(new Worker(task,keepAliveTime,unit));synchronized (threadSets){threadSets.add(thread);}thread.start();//记录活跃救急线程和总线程数other.incrementAndGet();ctl.incrementAndGet();}else if(workQueue.remainingCapacity()==0&&ctl.get()==maximumPoolSize&&other.get()//4、唤醒一个救急线程执行任务waitSets.signal();other.incrementAndGet();}else if(workQueue.remainingCapacity()==0&&ctl.get()==maximumPoolSize&&other.get()==maximumPoolSize-corePoolSize){//5、执行拒绝策略handler.rejectedExecution(task,this);}}/*** 停止添加任务,停止线程池*/public void shutdown(){//标志状态this.isShutDown.compareAndSet(false,true);//等待任务被分配到每个线程while(workQueue.size()>0){}//循环判断是否所以线程都结束了while(ctl.get()>0){synchronized (threadSets){for (Thread t:threadSets){if("TERMINATED".equals(t.getState())){ctl.decrementAndGet();threadSets.remove(t);continue;}//waiting:在阻塞获取任务if("WAITING".equals(t.getState().toString())){ctl.decrementAndGet();//打断在阻塞的线程t.interrupt();}}}}}/*** 现在停止线程池*/public void shutdownNow(){//标志状态this.isShutDown.compareAndSet(false,true);//循环打断线程while(ctl.get()>0){synchronized (threadSets){for (Thread t:threadSets){ctl.decrementAndGet();//打断在阻塞的线程t.interrupt();}}}}//默认的线程工厂public static class LzkDefaultThreadFactory implements LzkThreadFactory {private AtomicInteger i=new AtomicInteger(0);@Overridepublic Thread newThread(Runnable task) {return new Thread(task,"[lzkThreadPool-"+i.incrementAndGet()+"] ");}}//----------------------------拒绝策略------------------------------/*** 抛异常*/public static class AbortPolicy implements RejectedHandler {public AbortPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {throw new RejectedExecutionException(executor.toString()+"中任务队列已满,任务【" + r.toString() +"】无法执行!");}}/*** 调用者执行线程*/public static class CallerRunsPolicy implements RejectedHandler {public CallerRunsPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {r.run();}}/*** 丢弃任务队列的旧任务,将任务和新任务进行竞争*/public static class DiscardOldestPolicy implements RejectedHandler {public DiscardOldestPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {//是否停止向线程池添加任务if(!executor.getIsShutDown().get()){//丢弃任务队列的就任务executor.getWorkQueue().poll();//添加任务和新任务进行竞争executor.execute(r);}}}/*** 直接丢弃任务,不抛异常*/public static class DiscardPolicy implements RejectedHandler {public DiscardPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPool executor) {}}
}

测试

public class Demo {public static void main(String[] args) throws InterruptedException {ThreadPool threadPool=new ThreadPool(2, 4, 3, TimeUnit.SECONDS,new LinkedBlockingQueue<>(10), new LzkThreadFactory() {AtomicInteger tid=new AtomicInteger(0);@Overridepublic Thread newThread(Runnable task) {return new Thread(task,"t"+tid.incrementAndGet());}},new ThreadPool.DiscardPolicy());AtomicInteger atomicInteger=new AtomicInteger(0);CountDownLatch countDownLatch=new CountDownLatch(10);for (int j=0;j<14;j++){threadPool.execute(()->{for (int i=0;i<100;i++){atomicInteger.incrementAndGet();}System.out.println(Thread.currentThread().getName()+" "+atomicInteger.get());countDownLatch.countDown();});}try {countDownLatch.await();for (int j=0;j<4;j++){threadPool.execute(()->{for (int i=0;i<100;i++){atomicInteger.incrementAndGet();}System.out.println(Thread.currentThread().getName()+" "+atomicInteger.get());});}} catch (InterruptedException e) {e.printStackTrace();}threadPool.shutdown();System.out.println("结束");}
}

相关内容

热门资讯

我生病了小学作文【精简6篇】 我生病了小学作文 篇一我生病了前几天,我不知道怎么了,突然感觉身体不舒服。我感到头晕目眩,喉咙痛得像...
新学期新打算小学作文450字... 新学期新打算篇一:我要努力学习新的学期开始了,我制定了新的打算,那就是要努力学习。我相信只有努力学习...
我学会了西红柿炒鸡蛋小学作文... 我学会了西红柿炒鸡蛋小学作文 篇一我学会了西红柿炒鸡蛋上周,我学会了一道简单又美味的菜——西红柿炒鸡...
花朵的小学作文【最新3篇】 花朵的小学作文 篇一花朵的奇妙世界花朵是大自然的美丽礼物,它们以各种各样的颜色和形状装点着我们的环境...
小学生赏花的作文【通用4篇】 小学生赏花的作文 篇一春天是一个充满美丽花朵的季节,我非常喜欢春天。每当春天来临,我就会和家人一起去...
中秋之夜小学生作文【优选3篇... 中秋之夜小学生作文 篇一中秋之夜,月亮圆圆的,像一块白玉挂在天空中。我和爸爸妈妈一起出门,欣赏美丽的...
油面筋塞肉小学作文(推荐3篇... 油面筋塞肉小学作文 篇一我喜欢吃美食,尤其是一些特色的小吃。最近,我发现了一种非常好吃的小吃,那就是...
学游泳的小学作文(实用3篇) 学游泳的小学作文 篇一学游泳的小学作文大家好!我是小明,今天我要给大家分享一下我学游泳的经历。我是一...
小学生作文老师我想对你说【最... 小学生作文老师我想对你说 篇一尊敬的老师:您好!我是您的学生小明。我想借这篇作文向您表达我的感激之情...
一次有趣的实验小学生作文80... 一次有趣的实验篇一昨天,我参加了一次非常有趣的实验。老师让我们小组一起进行,我非常期待这个实验的结果...
春天小学一年级作文300字【... 春天小学一年级作文300字 篇一我的春天春天来了,大地上百花盛开,绿草如茵。我喜欢春天,因为春天是个...
校园的一角的作文【优选6篇】 校园的一角的作文 篇一校园的一角在校园的一角,有一个小花园,是我最喜欢的地方。虽然它不大,但却别有一...
参观科技馆的小学作文400字... 参观科技馆的小学作文400字 篇一:奇妙的科技世界我参观了我们学校附近的科技馆,这里展示了许多令人惊...
值得的学生作文【实用3篇】 值得的学生作文 篇一突破自我,迈向成功作为一名学生,我们应该时刻保持一种积极向上的心态,勇于追求进步...
走进直播间小学作文(最新4篇... 走进直播间小学作文 篇一近年来,随着互联网技术的快速发展,直播已经成为了一种非常流行的媒体形式。除了...
我的学校小学作文350字【精... 我的学校小学作文350字 篇一我所在的学校是一所小学,位于市区的中心地带。学校占地面积较小,但是设施...
春节大扫除小学作文【精选6篇... 春节大扫除小学作文 篇一:春节大扫除的乐趣春节是中国人最重要的传统节日之一,也是一年中家庭团聚最为频...
抓田螺小学作文(最新5篇) 抓田螺小学作文 篇一我和小伙伴们一起去抓田螺今天,天气晴朗,阳光明媚,我和几个好朋友决定一起去抓田螺...
送别的作文【推荐3篇】 送别的作文 篇一送别的作文 篇一人生中,不论是离别还是告别,都是一种成长的过程。无论是与亲人分离,还...
两个“可怜”作文(最新3篇) 两个“可怜”作文 篇一在生活中,我们常常会遇到一些让人心生怜悯的事情。这些事情或许是因为某些原因而导...