一般被问你对多线程了解多少的时候,你可能不仅仅只需要知道线程怎么创建,你可能需要了解线程的几种创建方式,线程的生命周期,线程池相关,并发安全,原子类,锁机制…
总之本文了解一下基础的
并发:指两个或多个事件在同⼀个时间段内发⽣。
并⾏:指两个或多个事件在同⼀时刻发⽣(同时发⽣)。
public class MyThread extends Thread{public MyThread(String name) {super(name);}@Overridepublic void run() {for (int i = 0; i < 200; i++) {System.out.println(getName()+":"+i);}}
}
public class MyRunable implements Runnable{public void run() {String name = Thread.currentThread().getName();System.out.println("Running " + name );try {for(int i = 4; i > 0; i--) {System.out.println("Thread: " + name + ", " + i);// 让线程睡眠一会Thread.sleep(50);}}catch (InterruptedException e) {System.out.println("Thread " + name + " interrupted.");}System.out.println("Thread " + name + " exiting.");}
}
public class MyCallableThread implements Callable {@Override public Integer call() throws Exception { int i = 0; for(;i<100;i++) { System.out.println(Thread.currentThread().getName()+" "+i); } return i; }
}
public class ThreadTest {public static void main(String[] args) {// 创建线程MyThread myThread = new MyThread("自定义线程");// 启动线程myThread.start();Thread runThread = new Thread(new MyRunable(), "runable线程");runThread.start();MyCallableThread myCallableThread = new MyCallableThread();FutureTask ft = new FutureTask<>(myCallableThread);new Thread(ft, "有返回值的线程").start();try {System.out.println("子线程的返回值:" + ft.get());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}
}
传参的话一般是是内部类的形式,或者给类定义相关属性,
返回值的话,你也可以使用定义相关属性的方式获取,或者基于 FutureTask
String str = "你好呀";Thread thread = new Thread((Runnable) () -> {System.out.println("str");}, "线程1");thread.start();
public void start()
使该线程开始执行;Java 虚拟机调用该线程的 run 方法。
public void run()
如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。
public final void setName(String name)
改变线程名称,使之与参数 name 相同。
public final void setPriority(int priority)
更改线程的优先级。
public final void setDaemon(boolean on)
将该线程标记为守护线程或用户线程。
public final void join(long millisec)
等待该线程终止的时间最长为 millis 毫秒。
public void interrupt()
中断线程。
public final boolean isAlive()
测试线程是否处于活动状态。
start是开启一个新的线程,run相当于正常调用方法(在线程池概念中,调用run方法来执行具体任务)
守护线程,用于为其他线程提供服务,一般是虚拟机退出的时候停止,需要在start方法之前设置
用户进程: 默认创建的是用户进程,用户进程在任务执行结束后终止
线程执行的优先级,这个是虚拟机中的概念,Thread.MIN_PRIORITY和Thread.MAX_PRIORITY两个常量定义了其最大和最小值,
当然并不是你设置了优先级更高就一定会先执行,而且还需要实际的系统支持优先级的设定,否则和没设置一样。似乎linux是不支持优先级这个概念的
主线程中创建了线程a,然后执行了线程a.join(),那么主线程会陷入阻塞状态,等线程a的任务执行完成或发生异常终止后才继续执行
这个方法支持毫秒数入参,意思大概是线程a终止或者等待多久后,主线程会继续执行
// 创建线程MyThread myThread = new MyThread("自定义线程");// 启动线程myThread.start();boolean alive1 = myThread.isAlive();myThread.join();System.out.println("等待结束");System.out.println(alive1);
获取当前线程
当前线程沉睡多少秒,会让出cpu的执行权
当前线程会让线程从运行转就绪状态,和sleep不一样的是,不需要陷入阻塞状态,可以和其他线程争夺cpu的执行权
NEW和TERMINATED对于中断操作几乎是屏蔽的,RUNNABLE和BLOCKED类似,对于中断操作只是设置中断标志位并没有强制终止线程,对于线程的终止权利依然在程序手中。WAITING/TIMED_WAITING状态下的线程对于中断操作是敏感的,他们会抛出异常并清空中断标志位。
// 创建线程MyThread myThread = new MyThread("自定义线程");// 启动线程,线程中睡眠10smyThread.start();boolean alive1 = myThread.isAlive();// 主线程睡眠2sThread.sleep(2000);// 设置myThread的中断状态为true,这个时候myThread会抛出java.lang.InterruptedException: sleep interrupted异常// 如果这个异常在myThread被捕获,那myThread将继续执行,否则停止myThread.interrupt();// 返回myThread这个线程的中断状态,由于myThread抛出异常后,中断状态马上会置为false,所以此处获取的还是falseSystem.out.println(myThread.isInterrupted());// 返回当前线程的中断状态静态方法。这里是主线程System.out.println(Thread.interrupted());
当然还有一种是子线程处于运行状态,并且一直在执行任务,然后主线程就可以设置子线程的中断状态,来停止子线程的运行,由于子线程处于运行状态,所以此时不会抛出中断异常
while (!Thread.currentThread().isInterrupted() && count < 1000) {System.out.println("count = " + count++);}System.out.println("线程停止: stop thread");}
当前线程是否持有某个对象的锁
如果需要在线程的外部获取线程抛出的异常
同时设置全局处理器和单个线程的处理器时,只有单个线程的处理器生效
public class ThreadTest5 {static {// 设置全局的梳理线程异常的类Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());}public static void main(String[] args) throws InterruptedException {// 创建线程MyThread myThread = new MyThread("自定义线程");myThread.setUncaughtExceptionHandler((t,e) ->{System.out.println("异常的线程名称:"+t.getName());System.out.println(e.getMessage());});// 启动线程,线程中睡眠10smyThread.start();}
}
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {@Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println("进入了全局的线程异常处理");}
}
LockSupport.park()休眠线程,LockSupport.unpark()唤醒线程,两个方法配合使用。
也可以通过LockSupport.parkNanos()指定休眠时间后,自动唤醒。
LockSupport.park()不会释放monitor锁。
线程被中断,LockSupport.park()不会抛出异常,也不会吞噬掉interrupt的状态,调用者可以获取interrupt状态,自行进行判断,线程是由于什么原因被唤醒了。
LockSupport.park()会是线程进入WAITING状态,而LockSupport.parkNanos(long nanos) 会进入TIMED_WAITING状态。
LockSupport.park(Object blocker)和LockSupport.getBlocker(t1)配合使用,可以进行自定义数据传输。
public class LockSupportTest {public static void main(String[] args) throws InterruptedException {Thread t = new Thread(() -> {// 睡眠1秒,避免unpark先执行
// try {
// TimeUnit.SECONDS. sleep(1);
// } catch (InterruptedException e) {
//
// }String a = "1";//使用LockSupport的park()方法阻塞当前线程tLockSupport.park(a);System.out.println("任务结束");});//启动当前线程tt.start();// 让park先执行TimeUnit.SECONDS.sleep(1);// 获取锁对象,用于传递数据,必须在park之后执行,只有park执行后而且unpark未执行才可以获取到,具体看源码Object blocker = LockSupport.getBlocker(t);//唤醒线程t,park方法实际上是获取一个许可证,而unpaark是提供一个许可证,可以比park先执行,先执行之后park就不会阻塞了LockSupport.unpark(t);System.out.println(blocker);System.out.println("释放锁");}
}
新建状态(New)
当线程对象对创建后,即进⼊了新建状态,如: Thread t = new MyThread();
就绪状态(Runnable)
当调⽤线程对象的start()⽅法( t.start(); ),线程即进⼊就绪状态。处于就绪状态的线程,只是说明此线程已经做好了准备,随时等待CPU调度执⾏,并不是说执⾏了 t.start() 此线程⽴即就会执⾏;
运⾏状态(Running)
当CPU开始调度处于就绪状态的线程时,此时线程才得以真正执⾏,即进⼊到运⾏状态。注:就 绪
状态是进⼊到运⾏状态的唯⼀⼊⼝,也就是说,线程要想进⼊运⾏状态执⾏,⾸先必须处于就绪状态
阻塞状态(Blocked)
处于运⾏状态中的线程由于某种原因,暂时放弃对CPU的使⽤权,停⽌执⾏,此时进⼊阻塞状态,直到其进⼊到就绪状态,才 有机会再次被CPU调⽤以进⼊到运⾏状态。根据阻塞产⽣的原因不同,
阻塞状态⼜可以分为三种:
1.等待阻塞:运⾏状态中的线程执⾏wait()⽅法,使本线程进⼊到等待阻塞状态;
2.同步阻塞 – 线程在获取synchronized同步锁失败(因为锁被其它线程所占⽤),它会进⼊同步阻塞状
态;
3.其他阻塞 – 通过调⽤线程的sleep()或join()或发出了I/O请求时,线程会进⼊到阻塞状态。当sleep()
状态超时、join()等待线程终⽌或者超时、或者I/O处理完毕时,线程重新转⼊就绪状态。
死亡状态(Dead)
线程执⾏完了或者因异常退出了run()⽅法,该线程结束⽣命周期。
wait和notify使用在后面介绍
**锁池:**假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。
等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前线程A就已经拥有了该对象的锁),同时线程A就进入到了该对象的等待池中。如果另外的一个线程调用了相同对象的notifyAll()方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,准备争夺锁的拥有权。如果另外的一个线程调用了相同对象的notify()方法,那么仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的锁池.
BLOCKED和WAITING,首先他们都是阻塞的一种状态,BLOCKED和WAITING两个状态最大的区别有两个:
一般不允许显示的创建线程,而是通过线程池创建
java.util.concurrent包下包含了线程安全相关的类,也包含了线程池相关的类
线程池真正的接口,常用的实现类有ScheduledThreadPoolExecutor和ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
第1个参数 :corePoolSize 表示常驻核心线程数。如果等于0,则任务执行完成后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。
第2个参数:maximumPoolSize 表示线程池能够容纳同时执行的最大线程数。从上方的示例代码中第一处来看,必须大于或等于1。如果待执行的线程数大于此值,需要借助第5个参数的帮助。缓存在队列中。如果maximumPoolSize 与corePoolSize 相等,即是固定大小线程池。
第3个参数: keepAliveTime
表示线程池中的线程空闲时间,当空闲时间达到keepAliveSeconds值时,线程被销毁,直到剩下corePoolSize 个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程大于corePoolSize 时,keepAliveSeconds才会起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 变量设置为ture时,核心线程超时后也会被回收。
TimeUnit : keepAliveSeconds
workQueue 表示缓存队列。如果线程池里的线程数大于corePoolSize ,就会放到缓存队列,缓存队列满了会创建新线程到maximumPoolsize;直到当请求的线程数大于maximumPoolSize时,会执行设定的策略,默认是拒绝创建策略。(注意:当线程池里的线程数大于corePoolSize且小于maximumPoolSize时,这时候再有请求的线程就会放到缓存队列,注意只是放到缓存队列但是不创建新的线程,直到请求的线程存满缓存队列时,才会开始创建新的线程,直到maxmunPoolSize就会拒绝创建或者执行提前设定的策略。
threadFactory: 创建线程的工厂
handler : 拒绝策略处理器
Executors是一个线程池的工厂类,预制以一些我们常用的线程池(就是不同的参数,实现不同功能的线程池),以下是几个常用的线程池,还有很多
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);ScheduledFuture scheduledFuture = executorService.schedule(new Callable() {public String call() {return "call";}}, 10, TimeUnit.SECONDS);try {System.out.println(scheduledFuture.get());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}
线程工厂一般我们会根据具体业务来指定创建线程的名称
示例,线程创建工厂
public class CustomThreadFactory implements ThreadFactory {private final AtomicInteger index = new AtomicInteger(1);private CustomThreadFactory() {SecurityManager sm = System.getSecurityManager();group = (sm != null) ? sm.getThreadGroup(): Thread.currentThread().getThreadGroup();}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r, "SocketStreamHandle-"+ index.getAndIncrement());t.setDaemon(true);if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}
来自:https://link.zhihu.com/?target=http%3A//ifeve.com/how-to-calculate-threadpool-size
一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)
- 如果是CPU密集型应用,则线程池大小设置为N+1
- 如果是IO密集型应用,则线程池大小设置为2N+1
如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。
但是,IO优化中,这样的估算公式可能更适合:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
因为很显然,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) CPU数目刚刚说到的线程池大小的经验值,其实是这种公式的一种估算值。
// 获取cpu核数
Runtime.getRuntime().availableProcessors()
看了上面这段,你大概想要整个项目使用一个线程池,来避免cpu频繁的切换,而实际上一个应用中线程池的数量是不能控制的,比如你使用了很多第三方依赖,这些依赖里面就可能会根据具体的逻辑需要创建线程池,而且为了避免多个任务之间的相互影响,应该是针对每个业务创建一个线程池。而线程池的大小也不一定需要是cpu核数。
如果我们使用spring定时任务的话,可能会创建一个定时任务的线程池,避免一个任务因为等待另一个任务执行而演示
正常使用的话,比如我们接收第三方的数据后,可以效验格式后尽快响应,剩下的操作异步处理,这个时候你大致可以创建一个线程池,交由spring容器进行管理,
然后就是一些第三方jar包,比如rocketMq,不管是消息接收者,还是消息发送者都有设置线程池的大小方法,如果我们需要加快消息的处理,可以设置消息接收者的线程池的最大线程数的大小。
springboot中@EnableAsync和@Async,你需要异步执行某个方法,你也可以通过配置设置异步执行的线程池的大小,具体似乎是通过AsyncConfigurer 来设置线程池和线程异常处理,以下仅做参考
@Configuration
@EnableAsync
public class ExecutorConfig {@Beanpublic Executor asyncServiceExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(5);//配置最大线程数executor.setMaxPoolSize(5);//配置队列大小executor.setQueueCapacity(20);// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(60);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix("async-service-");// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);//执行初始化executor.initialize();return executor;}
}
Async("asyncThreadPoolTaskExecutor")
线程池有shutdown方法
如果是springboot的话可以指定销毁方法,感觉似乎没啥必要,一般应该不需要销毁吧
@Bean(destroyMethod = "shutdown")
拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
jdk默认提供了四种拒绝策略:
可以实现RejectedExecutionHandler,定制一些记录导数据库(空闲再提取出来),打印日志
代码规范中已经不允许使用Executors来创建线程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());// 无返回值executor.execute(()->{System.out.println("1");});Future future = executor.submit(() -> "3");//Future的默认实现是FutureTask,还是阻塞获取结果,get方法支持配置阻塞的时间String str = future.get();System.out.println(str);List> threadList = new ArrayList<>();for (int i = 0; i < 100; i++) {int a = i+1;threadList.add(()-> "数字:" + a);}// 批量执行,只支持Callable类型的入参List> futures = executor.invokeAll(threadList);for (Future future1 : futures) {System.out.println(future1.get());}}
通过已经创建的线程调用相关类的run方法,而不是start方法
ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小;
LinkedBlockingQueue是一个无界(没有大小限制)缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待,在使用此阻塞队列时maximumPoolSizes就相当于无效了。LinkedBlockingQueue也可以设置大小。 示例参考LinkedBlockingQueue
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。参考 newCachedThreadPool
线程池工作队列的长度不要设置太长,应该考虑合理设置线程池的参数来保证任务的处理速度,过长的工作队列会导致内存溢出
大致就是存在共享的资源(成员变量)的情况下,多个线程对同一个共享变量进行操作,然后导致线程安全问题
以下是测试代码,我想要将num一直减小到0,但是在多线程的情况下 num距离0还差了很多
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());List> threadList = new ArrayList<>();for (int i = 0; i < 100; i++) {int a = i+1;threadList.add(()-> sub());}// 批量执行,只支持Callable类型的入参List> futures = executor.invokeAll(threadList);for (Future future1 : futures) {System.out.println(future1.get());}}static int num = 100;private static String sub(){num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}return Thread.currentThread().getName() +":"+ num;}
处理的方式也很简单,直接在sub方法上添加synchronized关键字
总之就是保证多线程下业务代码的原子性,可见性和顺序性
首先你需要了解一下jvm会将我们的字节码转换成指令,会对指令进行重排序,了解一个cpu的多级缓存(怎么读取一个变量的),a ++ 转换成指令其实是两个操作,一个相加,一个赋值。
原子性: 多个指令同时执行成功或同时执行失败。
可见性:线程啊对变量的修改对于其他线程是可见的,其他线程会读取到修改的最新值
顺序性:多个操作按顺序执行
就实际使用而言,加锁,使用线程安全相关的类,都是保证线程安全的方法,当然你需要先了解这些概念,才能正确使用
synchronized是JVM内置锁,基于Monitor机制实现,依赖底层操作系统的互斥原语Mutex(互斥量),它是一个重量级锁,性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight
Locking)、偏向锁(Biased Locking)、自适应自旋(Adaptive Spinning)等技术来减少锁操作的开销,内置锁的并发性能已经基本与Lock持平
在讲原理前,我们先讲一下Java对象的构成。在JVM中,对象在内存中分为三块区域:对象头,实例数据和对齐填充。如图所示:
对象头:
下面锁的说明来自:https://zhuanlan.zhihu.com/p/343305760
在说自适应自旋锁之前,先讲自旋锁。上面已经讲过,当线程没有获得monitor对象的所有权时,就会进入阻塞,当持有锁的线程释放了锁,当前线程才可以再去竞争锁,但是如果按照这样的规则,就会浪费大量的性能在阻塞和唤醒的切换上,特别是线程占用锁的时间很短的话。
为了避免阻塞和唤醒的切换,在没有获得锁的时候就不进入阻塞,而是不断地循环检测锁是否被释放,这就是自旋。在占用锁的时间短的情况下,自旋锁表现的性能是很高的。
但是又有问题,由于线程是一直在循环检测锁的状态,就会占用cpu资源,如果线程占用锁的时间比较长,那么自旋的次数就会变多,占用cpu时间变长导致性能变差,当然我们也可以通过参数-XX:PreBlockSpin
设置自旋锁的自旋次数,当自旋一定的次数(时间)后就挂起,但是设置的自旋次数是多少比较合适呢?
如果设置次数少了或者多了都会导致性能受到影响,而且占用锁的时间在业务高峰期和正常时期也有区别,所以在JDK1.6引入了自适应性自旋锁。
自适应性自旋锁的意思是,自旋的次数不是固定的,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。
表现是如果此次自旋成功了,很有可能下一次也能成功,于是允许自旋的次数就会更多,反过来说,如果很少有线程能够自旋成功,很有可能下一次也是失败,则自旋次数就更少。这样能最大化利用资源,随着程序运行和性能监控信息的不断完善,虚拟机对锁的状况预测会越来越准确,也就变得越来越智能。
锁消除是一种锁的优化策略,这种优化更加彻底,在JVM编译时,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁。这种优化策略可以消除没有必要的锁,节省毫无意义的请求锁时间。比如StringBuffer的append()方法,就是使用synchronized进行加锁的。
public synchronized StringBuffer append(String str) {toStringCache = null;super.append(str);return this;
}
如果在实例方法中StringBuffer作为局部变量使用append()方法,StringBuffer是不可能存在共享资源竞争的,因此会自动将其锁消除。例如:
public String add(String s1, String s2) {//sb属于不可能共享的资源,JVM会自动消除内部的锁StringBuffer sb = new StringBuffer();sb.append(s1).append(s2);return sb.toString();
}
如果一系列的连续加锁解锁操作,可能会导致不必要的性能损耗,所以引入锁粗话的概念。意思是将多个连续加锁、解锁的操作连接在一起,扩展成为一个范围更大的锁。
偏向锁是JDK1.6引入的一个重要的概念,JDK的开发人员经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得。也就是说在很多时候我们是假设有多线程的场景,但是实际上却是单线程的。所以偏向锁是在单线程执行代码块时使用的机制。
原理是什么呢,我们前面提到锁的争夺实际上是Monitor对象的争夺,还有每个对象都有一个对象头,对象头是由Mark Word和Klass pointer 组成的。一旦有线程持有了这个锁对象,标志位修改为1,就进入偏向模式,同时会把这个线程的ID记录在对象的Mark Word中,当同一个线程再次进入时,就不再进行同步操作,这样就省去了大量的锁申请的操作,从而提高了性能。
一旦有多个线程开始竞争锁的话呢?那么偏向锁并不会一下子升级为重量级锁,而是先升级为轻量级锁。
如果获取偏向锁失败,也就是有多个线程竞争锁的话,就会升级为JDK1.6引入的轻量级锁,Mark Word 的结构也变为轻量级锁的结构。
执行同步代码块之前,JVM会在线程的栈帧中创建一个锁记录(Lock Record),并将Mark Word拷贝复制到锁记录中。然后尝试通过CAS操作将Mark Word中的锁记录的指针,指向创建的Lock Record。如果成功表示获取锁状态成功,如果失败,则进入自旋获取锁状态。
自旋锁的原理在上面已经讲过了,如果自旋获取锁也失败了,则升级为重量级锁,也就是把线程阻塞起来,等待唤醒。
重量级锁就是一个悲观锁了,但是其实不是最坏的锁,因为升级到重量级锁,是因为线程占用锁的时间长(自旋获取失败),锁竞争激烈的场景,在这种情况下,让线程进入阻塞状态,进入阻塞队列,能减少cpu消耗。所以说在不同的场景使用最佳的解决方案才是最好的技术。synchronized在不同的场景会自动选择不同的锁,这样一个升级锁的策略就体现出了这点。
偏向锁:适用于单线程执行。
轻量级锁:适用于锁竞争较不激烈的情况。
重量级锁:适用于锁竞争激烈的情况。
我们看一下他们的区别:
个人觉得在锁竞争不是很激烈的场景,使用synchronized,语义清晰,实现简单,JDK1.6后引入了偏向锁,轻量级锁等概念后,性能也能保证。而在锁竞争激烈,复杂的场景下,则使用Lock锁会更灵活一点,性能也较稳定。
总之就是synchronized在大量线程争抢锁的情况下,按照其原理,可能会有大量线程陷入重量级锁的状况,然后就会有用户台和内核态的切换,所以大量的并发情况下使用ReentrantLock,在并发量不大的情况下,使用synchronized更简单
主要是分为类锁和对象锁,其中类锁的话,
public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());List> threadList = new ArrayList<>();for (int i = 0; i < 100; i++) {int a = i+1;threadList.add(()-> sub());}// 批量执行,只支持Callable类型的入参List> futures = executor.invokeAll(threadList);for (Future future1 : futures) {System.out.println(future1.get());}}static int num = 100;private synchronized static String sub(){num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}return Thread.currentThread().getName() +":"+ num;}
效果等同
private static String sub(){synchronized (safeTest2.class){num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}return Thread.currentThread().getName() +":"+ num;}}
public class safeTest3 {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());// 这个对象必须放在循环的外部,否则无效果safeTest3 safeTest3 = new safeTest3();for (int i = 0; i < 100; i++) {int a = i + 1;executor.execute(()->safeTest3.sub());}}int num = 100;private synchronized void sub() {num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + ":" + num);}
}
效果等同
private void sub() {synchronized(this){num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + ":" + num);}}
字符串在堆中是有一个统一的常量池,同一个字符串是堆中的同一个对象,所以我们对同一个字符串加锁,所有线程共享这把锁
private void sub() {synchronized("你好"){num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + ":" + num);}}
这个不是看是不是调用同一个方法,而是判定是不是获取的同一个锁对象,
1、锁对象中类对象和实例对象是不一样的,类对象在堆中只有一个,字符串在堆中也只有一个,所以只要是类锁或者字符串包裹的代码,都只有一个线程可以进入执行
2、而实例对象锁,由于一个类可以创建多个实例对象,你懂得。。。。。(抱歉,解释不清了)
wait和notify都是Object的方法,使用的对象必须持有锁,上面线程声明周期中的等待池和锁池的概念
public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());// 这个对象必须放在循环的外部,否则无效果safeTest5 safeTest3 = new safeTest5();for (int i = 0; i < 5; i++) {int a = i + 1;executor.execute(()->safeTest3.sub());}executor.execute(()->{synchronized ("你好呀"){"你好呀".notifyAll();}});}int num = 100;private void sub() {synchronized("你好呀"){try {"你好呀".wait();} catch (InterruptedException e) {e.printStackTrace();}num = num - 1;// 让测试效果更明显try {Thread.sleep(1);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + ":" + num);}}
大致是两个线程独获取到了对方需要的锁,导致这两个线程都处在阻塞状态
public class DeadLock implements Runnable {public int flag = 1; //静态对象是类的所有对象共享的 private static Object o1 = new Object(), o2 = new Object(); @Override public void run() { System.out.println("flag=" + flag); if (flag == 1) { synchronized (o1) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o2) { System.out.println("1"); } } } if (flag == 0) { synchronized (o2) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o1) { System.out.println("0"); } } } } public static void main(String[] args) { DeadLock td1 = new DeadLock(); DeadLock td2 = new DeadLock(); td1.flag = 1; td2.flag = 0; //td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。 //td2的run()可能在td1的run()之前运行 new Thread(td1).start(); new Thread(td2).start(); }
}
这部分内容来自:https://blog.csdn.net/NullpointerExcep/article/details/127352527
可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响。
在JAVA环境下ReentrantLock和synchronized都是可重入锁。
synchronized是一个可重入锁。在一个类中,如果synchronized方法1调用了synchronized方法2,方法2是可以正常执行的,这说明synchronized是可重入锁。否则,在执行方法2想获取锁的时候,该锁已经在执行方法1时获取了,那么方法2将永远得不到执行。
可重入锁在什么场景使用呢?
可重入锁主要用在线程需要多次进入临界区代码时,需要使用可重入锁。具体的例子,比如上文中提到的一个synchronized方法需要调用另一个synchronized方法时。
可重入锁的实现原理是怎么样的?
加锁时,需要判断锁是否已经被获取。如果已经被获取,则判断获取锁的线程是否是当前线程。如果是当前线程,则给获取次数加1。如果不是当前线程,则需要等待。
释放锁时,需要给锁的获取次数减1,然后判断,次数是否为0了。如果次数为0了,则需要调用锁的唤醒方法,让锁上阻塞的其他线程得到执行的机会。
让多个线程对同一变量的修改,对其他线程可见。
1、可以实现更新操作的原子性,这里特别说明i++ 是+1和赋值两个操作,所以不能用volatile来保证原子性
2、volatile是CAS机制的基础。
深入一点大致就是CPU存在多级缓存,同一个变量的值在主内存的数据和cpu高速缓存中的数据可能不一致,然后存在一个对比修改的过程。
- 程序以及数据被加载到主内存
- 指令和数据被加载到CPU的高速缓存
- CPU执行指令,把结果写到高速缓存
- 高速缓存中的数据写回主内存
java中CAS似乎都是调用sun.misc.Unsafe的相关方法来实现,这个方法很多地方都用了,比容原子类,AQS相关实现等
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
看了上面的原理部分知道第三个参数是旧址,第四个参数是新值
然后第一个和第二个参数用于定位你需要修改的数据的内存地址,
var1:要修改的对象起始地址 如:0x00000111
var2:需要修改的具体内存地址 如100 。0x0000011+100 = 0x0000111就是要修改的值的地址
注意没有var3
var4:期望内存中的值,拿这个值和0x0000111内存中的中值比较,如果为true,则修改,返回ture,否则返回false,等待下次修改。
var5:如果上一步比较为ture,则把var5更新到0x0000111其实的内存中。
原子操作,直接操作内存。
1、JDK8的HashMap是数组+链表+红黑树实现的
2、通过hashcode & (table.length - 1)计算在数组中的位置
1.与&:遇0则0
2.或 |:遇1则1
所以保证每一位都是1,按位与的计算结果为0到数组大小,当然我们设置大小的时候似乎不用太在意,里面有相关的转换来帮我们达到特定大小,比如我们初始化的是31或者31,那么初始化的数组大小是64,具体里面的位运算太多,头晕不看了
数据结构:ReentrantLock+Segment+HashEntry,一个Segment中包含一个HashEntry数组,每个
HashEntry又是一个链表结构
元素查询:二次hash,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部
锁:Segment分段锁 Segment继承了ReentrantLock,锁定操作的Segment,其他的Segment不受影
响,并发度为segment个数,可以通过构造函数指定,数组扩容不会影响其他的segment
get方法无需加锁,volatile保证
jdk8:
数据结构:synchronized+CAS+Node+红黑树,Node的val和next都用volatile修饰,保证可见性
查找,替换,赋值操作都使用CAS
锁:锁链表的head节点,不影响其他元素的读写,锁粒度更细,效率更高,扩容时,阻塞所有的读写
操作、并发扩容
读操作无锁:
Node的val和next使用volatile修饰,读写线程对该变量互相可见
数组用volatile修饰,保证扩容时被读线程感知
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;// 1、遍历node数组,node是key-value结构for (Node[] tab = table;;) {Node f; int n, i, fh;if (tab == null || (n = tab.length) == 0)、// 初始化node数组tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 判断如果数组当前位置为null,就使用cas机制设置数组这个位置的值,设置失败结束当前循环(说明其他线程设置了)if (casTabAt(tab, i, null,new Node(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)// 帮助扩容的逻辑tab = helpTransfer(tab, f);else {V oldVal = null;// 对于单个node的操作进行加锁synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node pred = e;if ((e = e.next) == null) {pred.next = new Node(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node p;binCount = 2;if ((p = ((TreeBin)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
定时任务计算时存储一下每次计算都需要用到的数据。
建立websoket连接时用于保存客户端id
@Slf4j
@Component
@ServerEndpoint("/websocket/incident")
public class WebSocket {/*** 与某个客户端的连接对话,需要通过它来给客户端发送消息*/private Session session;/*** 标识当前连接客户端的用户名*/private String uuid;/*** 用于存所有的连接服务的客户端,这个对象存储是安全的*/private static ConcurrentHashMap webSocketSet = new ConcurrentHashMap<>();@OnOpenpublic void OnOpen(Session session){this.session = session;String id = IdGenerator.randomUUID();this.uuid = id;// name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过name来区分webSocketSet.put(id,this);log.info("[WebSocket] 连接成功,当前连接人数为:={}",webSocketSet.size());}@OnClosepublic void OnClose(){webSocketSet.remove(this.uuid);log.info("[WebSocket] 退出成功,当前连接人数为:={}",webSocketSet.size());}@OnMessagepublic void OnMessage(String message){log.info("[WebSocket] 收到消息:{}",message);}/**** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("websocket客户端连接异常:",error);}/*** 群发* @param message*/public void GroupSending(String message){for (WebSocket webSocket : webSocketSet.values()){try {webSocket.session.getBasicRemote().sendText(message);}catch (Exception e){e.printStackTrace();}}}}
读不加锁,写加锁
关键属性 array 使用 volatile修饰,其修改添加方法中都先拷贝一个 array副本,然后对副本进行操作,操作完成后将副本赋值给 array属性。
private transient volatile Object[] array; public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}}
AQS:AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制。
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包
AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系
AQS具备的特性: 阻塞等待队列 、共享/独占、 公平/非公平 、可重入 、允许中断
AbstractQuenedSynchronizer中的Node内部类,Node中的属性prev、next等用来指向上一个Node和下一个Node, AbstractQuenedSynchronizer中又定义了head和tail用于获取一整个Node队列的头部和尾部。
+------+ prev +-----+ +-----+head | | <---- | | <---- | | tail+------+ +-----+ +-----+
在获取锁失败后,会进入队列的尾部,释放锁的时候会设置头部的下一个节点为新的头部,这个看我下面文档中的ReentrantLock中非公平锁加锁和解锁代码流程
五种状态:
1.初始状态 EXCLUSIVE
2.CANCELLED = 1:说明节点已经 取消获取 lock 了(一般是由于 interrupt 或 timeout 导致的)很多时候是在cancelAcquire 里面进行设置这个标识
3.SIGNAL = -1:表示当前节点的后继节点需要唤醒
4.CONDITION = -2:当前节点在 Condition Queue 里面
5.PROPAGATE = -3:当前节点获取到 lock 或进行 release lock 时, 共享模式的最终状态是 PROPAGATE(PS: 有可能共享模式的节点变成 PROPAGATE 之前就被其后继节点抢占 head 节点, 而从Sync Queue中被踢出掉)
static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
AbstractQueuedSynchronizer.ConditionObject,这个待定吧,不太了解
State三种访问方式: getState() setState() compareAndSetState()
AQS定义两种资源共享方式
不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
条件等待队列相关方法
似乎
AQS
public class ReentrantLockTest {private static final ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());for (int i = 0; i < 100; i++) {int a = i + 1;executor.execute(()->sub());}}static int num = 100;private static void sub() {lock.lock();try {num = num - 1;// 让测试效果更明显Thread.sleep(1);System.out.println(Thread.currentThread().getName() + ":" + num);} catch (Exception e) {e.printStackTrace();} finally {// 必须手动解锁lock.unlock();}}
}
更多示例可以参考AbstractQueuedSynchronizer.ConditionObject的调用位置
@Slf4j
public class ConditionTest {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(() -> {lock.lock();try {log.debug(Thread.currentThread().getName() + " 开始处理任务1");// 释放当前持有的锁,并且阻塞当前线程,同时向Condition队列尾部添加一个节点,持有锁的对象就是condition这个实例对象condition.await();log.debug(Thread.currentThread().getName() + " 结束处理任务1");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}).start();new Thread(() -> {lock.lock();try {log.debug(Thread.currentThread().getName() + " 开始处理任务2");Thread.sleep(2000);// 唤醒因调用Condition#await方法而阻塞的线程,必须持有锁才能调用condition.signal();log.debug(Thread.currentThread().getName() + " 结束处理任务2");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}).start();}}
21:13:12.384 [Thread-0] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-0 开始处理任务1
21:13:12.386 [Thread-1] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-1 开始处理任务2
21:13:14.394 [Thread-1] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-1 结束处理任务2
21:13:14.394 [Thread-0] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-0 结束处理任务1
建议先了解一下CAS
1、包含公平锁和非公平锁两个内部类,都继承自AbstractQuenedSynchronizer
// 默认是非公平锁
public ReentrantLock() {sync = new NonfairSync();}
2、非公平锁加锁方法
final void lock() {// 尝试将AbstractQueuedSynchronizer#state设置为已经加锁,方法里面的stateOffset用于定位state的内存地址if (compareAndSetState(0, 1))//AbstractOwnableSynchronizer#exclusiveOwnerThread 锁的拥有者设置为当前线程setExclusiveOwnerThread(Thread.currentThread());else// 没有获取到锁进行下一步acquire(1);}
public final void acquire(int arg) {// 尝试加锁,加锁失败为trueif (!tryAcquire(arg) &&// addwaiter将当前程组装成node对象,然后添加到队列的尾部,之前的尾部向前移动,并且返回一个node(可能是尾部的node)// acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 这里就实现了每一个节点只判断上一个节点的状态,来确定自身是不是可以获取锁了for (;;) {//获取传入节点的上一个节点final Node p = node.predecessor();// 判断节点是不是头节点,是的话尝试加锁if (p == head && tryAcquire(arg)) {// 头结点加锁成功,设置下一个节点为头结点setHead(node);//去除头节点和下一个节点的关联关系p.next = null; // help GCfailed = false;return interrupted;}// 将节点循环向前移动, 判断节点状态waitStatus来确定是否需要跳过,新加入的节点会被设置成SIGNALif (shouldParkAfterFailedAcquire(p, node) &&// waitStatus是默认状态才进入这里,里面会调用LockSupport.park将当前线程设置成阻塞状态parkAndCheckInterrupt())// 返回为true后,当前线程会被设置成中断状态interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
非公平锁的tryLock方法最终调用到这,lock方法的获取锁的逻辑也调用到这里,
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
// 将新的节点置为最后一个节点,并且设置新节点和之前的最后一个节点的关联关系
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))// 最后一个节点是null,那么就将队列头部设置成尾部,tail = head;} else {node.prev = t;// 如果最新的节点加入和尾部的节点不一致,那么将新节点设置成尾部节点的下一个节点if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
// 传入的是节点的上一个节点 和 节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)// SIGNAL表示这个节点的后续节点需要暂停return true;if (ws > 0) {// 表示当前节点的上一个节点是cancelled,可以越过这个节点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 将节点置为SIGNAL,表示等待上一个节点被唤醒compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
// 释放锁会将AbstractOwnableSynchronizer#exclusiveOwnerThread 锁的拥有者设置为null
protected final boolean tryRelease(int releases) {// 由于锁的可重入,这里知道减到0才表示释放成功int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 设置锁的状态 setState(c);return free;}
看完了非公平锁的源码,公平锁似乎就是 去除了 线程一进来就获取锁,获取锁失败才加入CLH队列的逻辑。也就是说非公平锁已经在同步队列中的还是按照先后顺序执行
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁,描述如下:
线程进入读锁的前提条件:
线程进入写锁的前提条件:
而读写锁有以下三个重要的特性:
public class ReadWriteLockTest {static Map map = new HashMap<>();private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();private Lock r = readWriteLock.readLock();private Lock w = readWriteLock.writeLock();// 读操作上读锁public String get(String key) {r.lock();try {System.out.println("进入读锁");TimeUnit.SECONDS.sleep(5);// TODO 业务逻辑return map.get(key);} catch (InterruptedException e) {e.printStackTrace();} finally {r.unlock();}return null;}// 写操作上写锁public void put(String key, String value) {w.lock();try {System.out.println("进入写锁");TimeUnit.SECONDS.sleep(10);// TODO 业务逻辑map.put(key,value);} catch (InterruptedException e) {} finally {w.unlock();}}public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();// 验证写写互斥,读写互斥,读和读不影响, 主要是看日志的打印时间executor.execute(()-> readWriteLockTest.put("1","2"));executor.execute(()-> System.out.println(readWriteLockTest.get("1")));executor.execute(()-> System.out.println(readWriteLockTest.get("1")));executor.execute(()-> readWriteLockTest.put("1","3"));executor.execute(()-> readWriteLockTest.put("1","4"));}
}
。。。。。
介绍
Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。
默认非公平锁
public Semaphore(int permits) {sync = new NonfairSync(permits);}
使用的是AQS的共享锁,加锁时用了tryAcquireShared,其中代表锁状态的字段AbstractQueuedSynchronizer#state在构造方法中设置为令牌的数量,获取锁成功则减一,然后小于0时就不能再获取锁了
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
其他线程阻塞和添加到同步队列中的代码和ReentrantLock中一致,反正都是调用到了AbstractQueuedSynchronizer中继承的方法
// 定义一个只能允许5个线程同时执行的 信号量private static final Semaphore semaphore = new Semaphore(5);public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());for (int i = 0; i < 100; i++) {int a = i + 1;executor.execute(()-> {try {// 获取到令牌才能往后执行,semaphore支持获取多个令牌,默认是非公平锁,具体查看详细的构造方法semaphore.acquire();// 测试结果应该是每隔1s打印出 5行System.out.println("这是第" + a + "个执行的线程");Thread.sleep(1000);} catch (InterruptedException e) {}finally {// 将令牌返还,应该获取几个令牌就返还几个令牌semaphore.release();}});}}
比如客服是有限的,所以只有当有客服处于空闲状态,才能接通电话。。。。。
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
初始化多个令牌,countDown获取领取令牌,一直到令牌的数量为0,countDownLatch.await()获取到令牌为0后就会继续执行。需要说明的是令牌不能重复使用,所以一个CountDownLatch实例只能使用一次。
public class CountDownLatchTest2 {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {final int index = i;new Thread(() -> {try {Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));System.out.println(Thread.currentThread().getName() + " finish task" + index);countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。countDownLatch.await();System.out.println("主线程:在所有任务运行完成后,进行结果汇总");}
}
CountDownLatch实现原理
底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
更多:https://www.jianshu.com/p/043ac5689002
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。public CyclicBarrier(int parties)// 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环 通过reset()方法可以进行重置
public class CyclicBarrierTest3 {private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(6);public static void main(String[] args) throws InterruptedException {AtomicInteger counter = new AtomicInteger();ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(12, 12, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(r) -> new Thread(r, counter.addAndGet(1) + " 号 "),new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 12; i++) {if(i == 5){// 让日志效果更明显TimeUnit.SECONDS.sleep(20);}threadPoolExecutor.submit(() ->{int sleepMills = ThreadLocalRandom.current().nextInt(1000);try {Thread.sleep(sleepMills);System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());// 线程到达屏障后等待其他选后就位cyclicBarrier.await();System.out.println("开始了"); } catch (InterruptedException e) {e.printStackTrace();//其他线程调用CyclicBarrier.reset(),会触发此异常} catch (BrokenBarrierException e) {e.printStackTrace();}});}}
}
基于CAS机制,最终还是使用了sun.misc.Unsafe中相关方法
在java.util.concurrent.atomic包里提供了一组原子操作类:
**基本类型:**AtomicInteger、AtomicLong、AtomicBoolean;
**引用类型:**AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
**数组类型:**AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64
基于CAS机制,重要属性是value,其内所有的cas都是操作这个value的内存地址。一般每个独立的业务都需要一个AtomicInteger实例对象。
重要方法
//以原子的方式将实例中的原值加1,返回的是自增前的旧值;
public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);
}//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
public final boolean getAndSet(boolean newValue) {boolean prev;do {prev = get();} while (!compareAndSet(prev, newValue));return prev;
}//incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果;
public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}//addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
public final int addAndGet(int delta) {return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
自加一的方法演示,多线程安全
public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());AtomicInteger atomicInteger = new AtomicInteger();CountDownLatch countDownLatch = new CountDownLatch(100);for (int i = 0; i < 100; i++) {int a = i + 1;executor.execute(() -> {countDownLatch.countDown();atomicInteger.getAndIncrement();System.out.println(atomicInteger);});}countDownLatch.await();System.out.println("最终结果:" + atomicInteger.get());}
还是基于CAS,会动态计算数组中某个元素的内存地址,然后进行操作
重要方法
//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
public final int addAndGet(int i, int delta) {return getAndAdd(i, delta) + delta;
}//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
public final int getAndIncrement(int i) {return getAndAdd(i, 1);
}//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
public final boolean compareAndSet(int i, int expect, int update) {return compareAndSetRaw(checkedByteOffset(i), expect, update);
似乎没啥好说的,就是cas替换的对象变成的引用类型
public class AtomicReferenceTest {public static void main(String[] args) {User user1 = new User("张三", 23);User user2 = new User("李四", 25);User user3 = new User("王五", 20);//初始化为 user1AtomicReference atomicReference = new AtomicReference<>();atomicReference.set(user1);//把 user2 赋给 atomicReferenceatomicReference.compareAndSet(user1, user2);System.out.println(atomicReference.get());//把 user3 赋给 atomicReferenceatomicReference.compareAndSet(user1, user3);System.out.println(atomicReference.get());}}
public class AtomicIntegerFieldUpdaterTest {public static class Candidate {volatile int score = 0;AtomicInteger score2 = new AtomicInteger();}public static final AtomicIntegerFieldUpdater scoreUpdater =AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");public static AtomicInteger realScore = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {final Candidate candidate = new Candidate();Thread[] t = new Thread[10000];for (int i = 0; i < 10000; i++) {t[i] = new Thread(new Runnable() {@Overridepublic void run() {if (Math.random() > 0.4) {candidate.score2.incrementAndGet();scoreUpdater.incrementAndGet(candidate);realScore.incrementAndGet();}}});t[i].start();}for (int i = 0; i < 10000; i++) {t[i].join();}System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);System.out.println("AtomicInteger Score=" + candidate.score2.get());System.out.println("realScore=" + realScore.get());}
}
对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:
(1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3
(2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
(5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。
AtomicInteger等基于cas的操作在高并发场景下,大量线程处于自旋状态,占用cpu资源,
public final long getAndAddLong(Object var1, long var2, long var4) {long var6;do {var6 = this.getLongVolatile(var1, var2);} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));return var6;}
所以可以使用LongAdder,
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
public void add(long x) {Cell[] as; long b, v; int m; Cell a;// cells数组已经不为null或者cas失败if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||// getProbe()是当前线程获取一个int值, & m是达到求模一样的效果,是线程可能落在数组的任何一个索引上(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}}public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。具体再说吧。。。。。。。。
队列结构,可以从头部和尾部添加和移除元素
public interface Queue extends Collection {//添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常boolean add(E e);//添加一个元素,添加成功返回true, 如果队列满了,返回falseboolean offer(E e);//返回并删除队首元素,队列为空则抛出异常E remove();//返回并删除队首元素,队列为空则返回nullE poll();//返回队首元素,但不移除,队列为空则抛出异常E element();//获取队首元素,但不移除,队列为空则返回nullE peek();
BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:
BlockingQueue的容量都是固定的,否则无法形成队列满了阻塞的操作
BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。
入队:
(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置
出队:
(1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)
(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null
(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据
BlockingQueue常用方法示例
当队列满了无法添加元素,或者是队列空了无法移除元素时:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出队 | remove() | poll() | take() | poll(time, unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
队列 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个有界阻塞队列 |
PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
ArrayBlockingQueue
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
java8中似乎也是使用CAs机制实现,反正我没看lock锁
其他好像也没啥说的,算了
一般多线程都不允许设置为null的元素,比如ConcurrentLinkedQueue不允许添加null,ConcurrentHashMap似乎key和value都不允许为null
假定ConcurrentHashMap也可以存放value为null的值。那不管是HashMap还是ConcurrentHashMap调用map.get(key)的时候,如果返回了null,那么这个null都有两重含义:
至于为啥不支持key为null,这个待定
上文中介绍Callable和线程池的时候使用了Future获取线程任务的执行结果
**Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。**必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Callable+Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建CompletionService
CompletionService cs = new ExecutorCompletionService<>(executor);
//异步向电商S1询价
cs.submit(() -> getPriceByS1());
//异步向电商S2询价
cs.submit(() -> getPriceByS2());
//异步向电商S3询价
cs.submit(() -> getPriceByS3());
//将询价结果异步保存到数据库
for (int i = 0; i < 3; i++) {Integer r = cs.take().get();executor.execute(() -> save(r));}
简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。
CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
描述依赖关系:
描述and聚合关系:
描述or聚合关系:
并行执行:
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
这四个方法区别在于:
runAsync&supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");CompletableFuture.runAsync(runnable);CompletableFuture future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});// 阻塞等待返回String result = future.get();System.out.println(result);}
join&get
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public class FutureTaskTest2 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 测试异常情况,正常情况测试注释这一行int i = 12 / 0;System.out.println("执行结束!");return "test";});// 获取任务的执行劫夺,这里应该可以对结果进行修改future.whenComplete(new BiConsumer() {@Overridepublic void accept(String t, Throwable action) {if(action != null){System.out.println("发生异常");}else {System.out.println(t + " 执行完成!");}}});// 任务抛出异常后处理future.exceptionally(new Function() {@Overridepublic String apply(Throwable t) {System.out.println("执行失败:" + t.getMessage());return "异常xxxx";}});// 获取结果必须写在结果处理和异常处理的后面,否则结果处理和异常处理不会执行System.out.println(future.get());}
}
所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。
thenApply
thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。
public class FutureTaskTest3 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一阶段:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("二阶段:" + result);return result;});// 阻塞等待返回,获取的是二阶段的返回结果Integer result = future.get();System.out.println(result);}
}
thenCompose
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
public CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn) ;
public class FutureTaskTest4 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一阶段:" + number);return number;}}).thenCompose(new Function>() {@Overridepublic CompletionStage apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二阶段:" + number);return number;}});}});System.out.println(future.get());}
}
与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。
根据对结果的处理方式,结果消费函数又分为:
thenAccept
通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenAccept(number ->System.out.println("第二阶段:" + number * 5));
thenAcceptBoth
thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果
CompletableFuture futrue1 = CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}
});CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}
});futrue1.thenAcceptBoth(future2, new BiConsumer() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));}
}
thenRun
thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;
}).thenRun(() ->System.out.println("thenRun 执行"));
thenCombine
thenCombine 方法,合并两个线程任务的结果,并进一步处理。
CompletableFuture future1 = CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}});
CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段:" + number);return number;}});
CompletableFuture result = future1.thenCombine(future2, new BiFunction() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});
所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。
applyToEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
.....future1.applyToEither(future2, new Function() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}
acceptEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
future1.acceptEither(future2, new Consumer() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);}
runAfterEither
两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}
}).join();
runAfterBoth
两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。
anyOf
anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。
allOf
allOf方法用来实现多 CompletableFuture 的同时返回。
需要使用自定义线程池:https://www.cnblogs.com/blackmlik/p/16098938.html
使用案例:实现最优的“烧水泡茶”程序
著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。
public class CompletableFutureDemo2 {public static void main(String[] args) {//任务1:洗水壶->烧开水CompletableFuture f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());}static void sleep(int t, TimeUnit u) {try {u.sleep(t);} catch (InterruptedException e) {}}
}
Disruptor
ForkJoinPool和ForkJoinTask
-,待定
什么是分布式锁?
分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
比如定时任务执行,如果部署了多个实例,需要保证只有一台机器在运行这个定时任务
实现方案:
你可使用redis自行实现
开源框架~Redisson 分布式锁的相关实现:https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8
上一篇:华为云Nginx配置
下一篇:设计模式-day03