概念:
当队列是空的,从队列中获取(Take)元素的操作将会被阻塞
当队列是满的,从队列中添加(Put)元素的操作将会被阻塞
试图中空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插⼊新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除⼀个或多个元素 或者完全清空,使队列变得空闲起来后并后续新增
好处:阻塞队列不⽤⼿动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。
体系: Collection→ Queue→ BlockingQueue→七个阻塞队列实现类。
粗体标记的三个⽤得⽐较多,许多消息中间件底层就是⽤它们实现的。
需要注意的是 LinkedBlockingQueue 虽然是有界的,但有个巨坑,其默认⼤⼩是 Integer.MAX_VALUE ,⾼达21亿,⼀般情况下内存早爆了(在线程池的 ThreadPoolExecutor 有体现)。
API:
抛出异常是指: 当队列满时,再次插⼊会抛出异常;
返回布尔是指:当队列满时,再次插⼊会返回false;
阻塞是指:当队列满时,再次插⼊会被阻塞,直到队列取出⼀个元素,才能插⼊。
超时是指:当⼀个时限过后,才会插⼊或者取出。
代码
public class BlockingQueueDemo {public static void main(String[] args) throws InterruptedException {//定义容量为3的阻塞队列BlockingQueue blockingQueue = new ArrayBlockingQueue(3);System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));System.out.println(blockingQueue.add("e"));System.out.println(blockingQueue.element());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());}
结果分析
IllegalStateException: Queue full
NoSuchElementException
代码
public static void main(String[] args) throws InterruptedException {//定义容量为3的阻塞队列BlockingQueue blockingQueue = new ArrayBlockingQueue(3);System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("e"));System.out.println(blockingQueue.peek());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());}
结果
代码
public static void main(String[] args) throws InterruptedException {//定义容量为3的阻塞队列BlockingQueue blockingQueue = new ArrayBlockingQueue(3);blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");blockingQueue.put("d");System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());}
结果
代码
public static void main(String[] args) throws InterruptedException {//定义容量为3的阻塞队列BlockingQueue blockingQueue = new ArrayBlockingQueue(3);System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));}
结果
队列只有⼀个元素,如果想插⼊多个,阻塞到队列元素取出后,才能插⼊,只能有⼀个“坑位”,⽤⼀个
插⼀个,详⻅SynchronousQueueDemo。
代码
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue blockingQueue=new SynchronousQueue();new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"\t put 1");blockingQueue.put("1");System.out.println(Thread.currentThread().getName()+"\t put 2");blockingQueue.put("2");System.out.println(Thread.currentThread().getName()+"\t put 3");blockingQueue.put("3");} catch (InterruptedException e) {e.printStackTrace();}},"AAA").start();new Thread(()->{try {try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take "+blockingQueue.take());try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take "+blockingQueue.take());try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());} catch (Exception e) {e.printStackTrace();}},"BBB").start();}
}
结果
传统模式使⽤ Synchronized来进⾏操作。
/*** 题目:现在两个线程,可以操作初始值为零的一个变量,* 实现一个线程对该变量加1,一个线程对该变量-1,* 实现交替,来10轮,变量初始值为0.* 1.高内聚低耦合前提下,线程操作资源类* 2.判断/干活/通知* 3.防止虚假唤醒(判断只能用while,不能用if)* 知识小总结:多线程编程套路+while判断+新版写法*/
public class ProdConsumerDemo {public static void main(String[] args) {Aircondition aircondition = new Aircondition();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.increment();} catch (Exception e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 1; i <= 5; i++) {try {aircondition.decrement();} catch (Exception e) {e.printStackTrace();}}}, "B").start();}
}
class Aircondition {private int number = 0;//⽼版写法public synchronized void increment() throws Exception {//1.判断if (number != 0) {this.wait();}//2.⼲活number++;System.out.println(Thread.currentThread().getName() + "\t" + number);//3通知this.notifyAll();}public synchronized void decrement() throws Exception {//1.判断if (number == 0) {this.wait();}//2.⼲活number--;System.out.println(Thread.currentThread().getName() + "\t" + number);//3通知this.notifyAll();}
}
上述我们只用了一个线程作为生产者,一个线程作为消费者,我们用多个来进行测试。防止虚假唤醒(判断只能用while,不能用if)
while循环与if判断
while是循环语句,当满足条件时执行语句,执行完循环以后再回来判断是否满足,满足继续执行,然后继续判断,不满足直接执行下面的语句
if是判断语句,满足条件就行,执行完以后继续执行下面的语句,不会再回来判断执行。
A线程生产包子,B线程消费包子,C线程生产包子,D线程消费包子
public class ProdConsumerDemo {public static void main(String[] args) {Aircondition aircondition = new Aircondition();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.increment();} catch (Exception e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 1; i <= 5; i++) {try {aircondition.decrement();} catch (Exception e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.increment();} catch (Exception e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.decrement();} catch (Exception e) {e.printStackTrace();}}}, "D").start();}
}
Aircondition
class Aircondition {private int number = 0;//⽼版写法public synchronized void increment() throws Exception {//1.判断while (number != 0) {this.wait();}//2.⼲活number++;System.out.println(Thread.currentThread().getName() + "\t" + number);//3通知this.notifyAll();}public synchronized void decrement() throws Exception {//1.判断while (number == 0) {this.wait();}//2.⼲活number--;System.out.println(Thread.currentThread().getName() + "\t" + number);//3通知this.notifyAll();}
}
Synchronized用在多线程中太重了,在高并发场景使用lock方式更加合适。所以我们使用lock来加锁和解锁。对应的等待和唤醒线程方法也换成java.util.concurrent.locks下面的newCondition方法。
class Aircondition{private int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();//新版写法public void increment() throws Exception{lock.lock();try{//1.判断while (number != 0){condition.await();}//2.干活number++;System.out.println(Thread.currentThread().getName()+"\t"+number);//3通知condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() throws Exception{lock.lock();try{//1.判断while (number == 0){condition.await();}//2.干活number--;System.out.println(Thread.currentThread().getName()+"\t"+number);//3通知condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}public class ProdConsumerDemo {public static void main(String[] args) {Aircondition aircondition = new Aircondition();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.increment();} catch (Exception e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 1; i <= 5; i++) {try {aircondition.decrement();} catch (Exception e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.increment();} catch (Exception e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {try {aircondition.decrement();} catch (Exception e) {e.printStackTrace();}}}, "D").start();}
}
现在有这样一个需求:
备注:多线程之间按顺序调用,实现A->B->C
三个线程启动,要求如下:
A打印5次,B打印10次,C打印15次
接着
A打印5次,B打印10次,C打印15次
来10轮
1.高内聚低耦合前提下,线程操作资源类
2.判断/干活/通知
3.多线程交互中,防止虚假唤醒(判断只能用while,不能用if)
4.标志位
class ShareData{private int number = 1;//A:1,B:2,C:3private Lock lock = new ReentrantLock();private Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();private Condition c3 = lock.newCondition();public void printc1(){lock.lock();try {//1.判断while (number != 1){c1.await();}//2.干活for (int i = 1; i <= 5; i++) {System.out.println(Thread.currentThread().getName()+"\t"+i);}//3.通知number = 2;//通知第2个c2.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void printc2(){lock.lock();try {//1.判断while (number != 2){c2.await();}//2.干活for (int i = 1; i <= 10; i++) {System.out.println(Thread.currentThread().getName()+"\t"+i);}//3.通知number = 3;//如何通知第3个c3.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void printc3(){lock.lock();try {//1.判断while (number != 3){c3.await();}//2.干活for (int i = 1; i <= 15; i++) {System.out.println(Thread.currentThread().getName()+"\t"+i);}//3.通知number = 1;//如何通知第1个c1.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}/*** 备注:多线程之间按顺序调用,实现A->B->C* 三个线程启动,要求如下:* A打印5次,B打印10次,C打印15次* 接着* A打印5次,B打印10次,C打印15次* 来10轮* 1.高内聚低耦合前提下,线程操作资源类* 2.判断/干活/通知* 3.多线程交互中,防止虚假唤醒(判断只能用while,不能用if)* 4.标志位*/
public class ConditionDemo {public static void main(String[] args) {ShareData shareData = new ShareData();new Thread(()->{for (int i = 1; i <= 10; i++) {shareData.printc1();}},"A").start();new Thread(()->{for (int i = 1; i <= 10; i++) {shareData.printc2();}},"B").start();new Thread(()->{for (int i = 1; i <= 10; i++) {shareData.printc3();}},"C").start();}
}
结果
结果分析
synchronized
关键字和 java.util.concurrent.locks.Lock
都能加锁,两者有什么区别呢?
为什么需要BlockingQueue?
好处是我们不需要关⼼什么时候需要阻塞线程,什么时候需要唤醒线程,因为这⼀切BlockingQueue都
给你⼀⼿包办好了,使⽤阻塞队列 后就不需要⼿动加锁了。
在Concurrent包发布以前,在多线程环境下,我们每个程序员都必须去⾃⼰控制这些细节,尤其还要兼
顾效率和线程安全,⽽这会给我们的程序带来不⼩的复杂度。
public class ProdConsBlockQueueDemo {public static void main(String[] args) {MyResource myResource = new MyResource(new ArrayBlockingQueue<>(5));new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t生产线程启动");try {myResource.myProd();} catch (Exception e) {e.printStackTrace();}}, "prod").start();new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t生产线程启动");try {myResource.myProd();} catch (Exception e) {e.printStackTrace();}}, "prod-2").start();new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t消费线程启动");try {myResource.myCons();} catch (Exception e) {e.printStackTrace();}}, "cons").start();new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t消费线程启动");try {myResource.myCons();} catch (Exception e) {e.printStackTrace();}}, "cons-2").start();try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}System.out.println("5秒钟后,叫停");myResource.stop();}
}class MyResource {// 定义成volatile类型,线程间可见 默认开启,进行生产+消费private volatile boolean FLAG = true;//定义原子Integer类型保证原子性private AtomicInteger atomicInteger = new AtomicInteger();private BlockingQueue blockingQueue;public MyResource(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue;System.out.println(blockingQueue.getClass().getName());}public void myProd() throws Exception {String data = null;boolean retValue;while (FLAG) {data = atomicInteger.incrementAndGet() + "";//++iretValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);if (retValue) {System.out.println(Thread.currentThread().getName() + "\t" + "插入队列" + data + "成功");} else {System.out.println(Thread.currentThread().getName() + "\t" + "插入队列" + data + "失败");}TimeUnit.SECONDS.sleep(1);}System.out.println(Thread.currentThread().getName() + "\t老板叫停了,FLAG已更新为false,停止生产");}public void myCons() throws Exception {String res;while (FLAG) {res = blockingQueue.poll(2L, TimeUnit.SECONDS);if (null == res || "".equals(res)) {// FLAG = false;System.out.println(Thread.currentThread().getName() + "\t超过2秒钟没有消费,退出消费");return;}System.out.println(Thread.currentThread().getName() + "\t\t消费队列" + res + "成功");}}public void stop() {this.FLAG = false;}
}
结果分析