JUC并发编程
JUC并发编程
线程和进程
进程(英语:process),是指计算机中已运行的程序。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
– 维基百科
不同编程语言的线程环境会不一样,Java语言在很早就支持了多线程接口。(Java程序在Java虚拟机中运行,虚拟机通常还会包含自己特有的线程,例如垃圾回收线程。)。而对于JavaScript这样的语言来说,它就没有多线程的概念。
当我们只有一个处理器时,所有的进程或线程会分时占用这个处理器。但如果系统中存在多个处理器时,则就可能有多个任务并行的运行在不同的处理器上。
线程
那么什么是线程呢?
线程是一个执行上下文,它包含诸多状态数据:每个线程有自己的执行流、调用栈、错误码、信号掩码、私有数据。Linux内核用任务(Task)表示一个执行流。
Java中开启线程的方式
- Thread
- Runnable
- Callable
为什么Java不能真正的开启线程?
new Thread().start();
start():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
start本质是调用native本地方法,底层c++
start0()
1 2 3 4 5 6 7
private native void start0(); @Override public void run() { if (target != null) { target.run(); } }
并发和并行
- 并发
- CPU一个核心模拟多条线程,多线程操作一个资源
- 并行
- CPU多核,多个线程同时执行;线程池
System.out.println(Runtime.getRuntime().availableProcessors());
获得CPU核数
线程的状态
- 新生:NEW
- 运行:RUNNABLE
- 阻塞:BLOCKED
- 等待:WATTING
- 超时等待:TIMED_WAITTING
- 终止:TERMINATED
wait和sleep区别
|
|
-
来自不同类
- wait=>Object
- sleep=>Thread
-
锁的释放
- wait会释放锁
- sleep留着锁sleep
-
使用范围不同
- wait必须在同步代码块中(必须有需要的等待的同步资源才要wait)
- sleep可以在任何地方
-
是否捕获异常
- wait 需要捕获异常
InterruptedException
- sleep 需要捕获异常
InterruptedException
- wait 需要捕获异常
-
是否需要唤醒
- wait需要被唤醒
- sleep不需要手动唤醒
Lock锁
线程开发方式
-
类继承Runnable
@FunctionalInterface
函数式接口 -
资源类(属性,方法),将资源类放入线程。耦合性低
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
public class Test1 { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(ticket::sale,"A"); new Thread(ticket::sale,"B"); new Thread(ticket::sale,"C"); } } class Ticket{ private int number=50; public void sale(){ if(number>0){ System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余:"+number); } } }
-
什么是锁
传统synchronized
锁
|
|
- 锁对象
- 锁Class
Lock接口
void lock()
boolean tryLock()
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void lockInterruptibly()
void unlock()
Condition newCondition()
有3个实现类:
- ReentrantLock 可重入锁(常用)
- ReentrantReadWriteLock.ReadLock 读锁
- ReentrantReadWriteLock.WriteLock 写锁
ReentrantLock
|
|
- 默认构造是非公平锁
|
|
- 如果fair==true 是公平锁
公平锁:先来后到
非公平锁:可以插队(避免长时间占用)
Ticket类用ReentrantLock实现加锁
|
|
Synchronized和Lock的区别
- synchronized java内置关键字,Lock是java类
- synchronized 无法判断锁的状态,Lock可以判断是否获取到了锁
- synchronized 会自动释放锁,Lock必须手动释放锁(不释放会 死锁)
- synchronized 线程会一直等待,Lock可以使用
tryLock()
,不会一直等待下去; - synchronized 可重入锁,不可以中断,非公平;Lock,可重入锁,可以判断锁,自己设置是否公平
- synchronized 适合锁少量的代码同步问题,Lock适合锁大量同步代码(灵活性高)
生产者和消费者问题
面试常谈:
- 单例模式
- 排序算法
- 生产者和消费者
- 死锁
线程之间的通信问题:生产者和消费者问题。
- 等待唤醒
- 通知唤醒
线程A和线程B 操作同一个变量,需要A和B通信决定谁来操作资源。
|
|
虚假唤醒
因为线程可能会被 虚假唤醒,而不会被通知、中断或超时。
所以应该让线程通过被唤醒的条件来防范,条件不满足则继续等待。
等待应该出现在循环中:while(<condition does not hold>)
- 上面的方法中的if应该换成while。
使用JUC解决生产者消费者问题
|
|
|
|
- JUC使用Condition接口,实现了
await()
等待,signal
通知
使用Lock实现:
|
|
condition.await()
等待condition.signalAll()
通知全部
JUC的Condition可以精准通知、等待。灵活性比传统的synchronized。
我想实现A执行完通知B,B执行完通知C。要如何实现精确通知?
|
|
|
|
使用不同的Condition
和条件变量可以控制唤醒不同的线程。
锁的现象
synchronized 锁的几个问题
-
synchronized 锁的对象是方法的调用者,锁的是一个对象。
- 谁先拿到谁先执行
-
synchronized 锁在类任何一个方法中声明。而没有synchronized锁声明的普通方法,不受锁的影响。
- 普通方法不受锁的影响
-
不同对象的锁不一样,互不影响
-
static静态同步方法的,Class一加载就有了。所有实例对象都受影响。
- 方法加个static,锁的是Class类模板与实例对象不同
-
static静态方法和普通同步方法,谁先调用谁先执行。
- 两个的锁不是同一个锁,互相不受影响
new this 具体的一个对象
static Class 唯一的一个模板
集合类不安全
并发下ArrayList
会抛出异常java.util.ConcurrentModificationException
并发修改异常,是不安全的。
List不安全
解决方案:
Vector
是并发安全的。- Vector的函数带有
synchronized
关键字
- Vector的函数带有
Collections.synchronizedList(new ArrayList<>())
- 使用Collections工具包,将
ArrayList
转换为可并发的
- 使用Collections工具包,将
List<String> list = new CopyOnWriteArrayList<String>()
- 使用JUC包下的
CopyOnWriteArrayList
写入时复制 - 比之
Vector
,有synchronized
效率较低。CopyOnWriteArrayList
是用Lock锁 - 源码解读:
- 读是无锁并发的
- 写是复制一份到新容器,不影响老容器。写完后将数据指定到新容器。
- 写和读在不同容器上,读效率更高。
- 内存容量问题
- 数据不一致问题(无法实时)
- 使用JUC包下的
COW: 写入时复制(快照写)
多个线程调用读取时,读取到的是固定的;写入时,先复制一份,写完调用后再放回去。
在写入时避免覆盖,造成数据问题。
读写分离
Set不安全
解决方案:
Collections.synchronizedSet()
new CopyOnWriteArraySet()
⚠️没有Vector
这样的代替类
HashSet底层原理:HashSet底层就是HashMap
HashSet add本质上就是map key是无法重复的
1 2 3
public boolean add(E e) { return map.put(e, PRESENT)==null; }
Map不安全
多线程环境下,使用
Hashmap
进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
HashTable
- 对于Hashtable而言,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占。相当于所有线程进行读写时都去竞争一把锁,导致效率非常低下。
HashMap底层
|
|
- 初始容量:1«4 (16)
- 最大容量
- 默认加载因子
解决方案:
-
Collections.synchronizedMap()
-
ConcurrentHashMap
HashTable
容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable
的线程都必须竞争同一把锁。那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率- 使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问
- 可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个
ConcurrentHashMap
加锁 ConcurrentHashMap
是由Segment数组结构和HashEntry
数组结构组成。Segment是一种可重入锁ReentrantLock
,在ConcurrentHashMap
里扮演锁的角色,HashEntry
则用于存储键值对数据。一个ConcurrentHashMap
里包含一个Segment
数组,Segment
的结构和HashMap类似,是一种数组和链表结构, 一个Segment
里包含一个HashEntry
数组,每个HashEntry
是一个链表结构的元素, 每个Segment
守护着一个HashEntry
数组里的元素,当对HashEntry
数组的数据进行修改时,必须首先获得它对应的Segment
锁。
Callable
Callable
接口类似于Runnable
,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,Runnable
不返回结果,也不能抛出被检查的异常。
Callable
有返回值- 可以抛出异常
- 方法不同
Runnalbe
是run()
Callable
是call()
class MyThread implements Callable<String>
是泛型,指定一个返回值类型作为泛型参数。
⚠️问题是,Thread(Runnable)
只接收Runnable
,如何让Callable
在Thread
启动?
- 使用
FutureTask<V>(Callable)
FutureTask<V>
是Runnable
的一个实现类Callable
通过FutrueTask<V>
适配类在Thread
启动。
|
|
FutureTask<String> futureTask=new FutureTask<>(thread);
获得对应的Callable
实现类的FutureTask
。FutureTask<V>
泛型参数是Callable<V>
的返回类型
futureTask.get()
,获得FutureTask
的返回。- 会阻塞,要等待运行完成。途中如果被中断会抛中断异常,别的异常都会以
ExecutionException
执行异常的形式抛出 - 或者用异步通信来处理。
- 会阻塞,要等待运行完成。途中如果被中断会抛中断异常,别的异常都会以
|
|
|
|
多次调用线程,可以看到
- JVM再次调用的FutureTask所持有的线程,会直接返回,避免重复的查询
FutureTask的run()仅执行一次的原因: 1. state != NEW表示任务正在被执行或已经完成, 直接return 2. 若state==NEW, 则尝试CAS将当前线程 设置为执行run()的线程,如果失败,说明已经有其他线程 先行一步执行了run(),则当前线程return退出.
常用的辅助类
CountDownLatch
- 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助
- 是一个减数计数器
|
|
countDownLatch.countDown()
计数器数量-1countDownLatch.await()
等待计数器归零然后向下执行。(而不是所有子线程结束)
CyclicBarrier
- 允许一组线程全部等待彼此达到共同屏障点的同步辅助(都到达指定的数量后,执行提前设定的新线程)
- 加法计数器
|
|
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{})
实例化一个,并且指定屏障值和达到后开启的线程函数。cyclicBarrier.await()
阻塞等待,直到达到 屏障点数,并且执行完了CyclicBarrier
指定的线程函数后再继续
Semaphore 信号量
-
信号量维持一组许可证。 如果有必要,每个[
acquire()
都会阻塞,直到许可证可用,然后才能使用它。 每个release()
添加许可证,潜在地释放阻塞获取方。 -
计数信号量
-
就是轮流等待”停车位“,直到其他线程发出信号“让出位置”才能进去。
|
|
- semaphore.acquire()获得信号量,如果已经满了无法获取就阻塞等待直到释放为止(-1)
- semaphore.release()释放当前信号量(+1),唤醒等待的线程。
ReadWriteLock 读写锁
-
ReadWriteLock
维护一对关联[locks
,一个用于只读操作,一个用于写入。read lock
可以由多个阅读器线程同时进行,只要没有作者。write lock
是独家的。- 可以被多线程同时读,但写只能被一个线程写。
-
ReadWriteLock
是接口,只有一个实现类ReadWriteLock
-
比Lock有更加细粒度的操作。
- 读锁
lock.readLock()
- lock()
- unlock()
- 写锁
lock.writeLock()
- lock()
- unlock()
|
|
BlockingQueue 阻塞队列
-
接口
-
FIFO,先进先出
-
写入:如果队列满了阻塞等待
-
读取:队列为空阻塞等待
什么时候用到?
多线程:A调用B,A要等B
线程池:队列维护内部大小
常用实现类:
LinkedBlockingDeque
ArrayBlockingQueue
四组API:
方式 | 抛出异常 | 有返回值不抛出异常 | 阻塞等待 | 超市等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(..,..,) |
移除 | remove() 现在已经不抛出异常了 |
poll() | take() | poll(..,..,) |
检测队首元素 | element | peek() | - | - |
SynchronousQueue 同步队列
- 没有容量
- 进去一个元素,必须等待取出来之后,才能再往里面放一个元素
- * put了一个元素,必须从里面先take取出来,否则不能在put进去值
- put,take
线程池
还有线程池、连接池、内存池、对象池
池化技术:
- 事先准备好一些资源,有人要用直接拿,用完之后还回来
线程池的好处:
- 降低资源的消耗
- 提高响应的速度
- 方便管理。
- 线程复用、可以控制最大并发数、管理线程
❗线程池3大方法
【强制】线程池不允许使用
Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样ThreadPoolExecutor
的处理方式让同学更加明确线程池的运行规则
FixedThreadPool
和SingleThreadPool
允许的请求队列长度为 Integer.MAX_VALUE(约为21亿),会堆积大量请求,导致OOM(JVM)。CachedThreadPool
和ScheduledThreadPool
允许的创建线程数量为Interger.MAX_VALUE,会创建大量线程,从而导致OOM。
Executors
-
工具类
-
创建线程池方法
ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱
-
启动线程方法
1 2 3
threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); });
-
关闭线程池
1
threadPool.shutdown();
源码分析:
-
newSingleThreadExecutor()
1 2 3 4 5 6
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
newFixedThreadPool()
1 2 3 4 5
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
newCachedThreadPool()
1 2 3 4 5
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
❗可以看出,Executors
本质上调用的是ThreadPoolExecutor
|
|
7个参数
-
corePoolSize
(核心池大小):- 核心线程池的大小。即使这些线程处于空闲状态,它们也不会被回收,除非
allowCoreThreadTimeOut
设置为true
。在任务开始之前,线程池会根据需要创建新线程,直到达到corePoolSize
的大小。
- 核心线程池的大小。即使这些线程处于空闲状态,它们也不会被回收,除非
-
maximumPoolSize
(最大池大小):- 线程池能够容纳的最大线程数。当队列满了之后,线程池如果还要执行新的任务,就会创建新的线程,直到总线程数不超过
maximumPoolSize
。
- 线程池能够容纳的最大线程数。当队列满了之后,线程池如果还要执行新的任务,就会创建新的线程,直到总线程数不超过
-
keepAliveTime
(线程存活时间):- 当线程数超过
corePoolSize
时,空闲线程在终止前等待新任务的最长时间。如果设置了allowCoreThreadTimeOut
,那么这个参数也会应用于核心线程。
- 当线程数超过
-
unit
(时间单位):keepAliveTime
参数的时间单位。可以是TimeUnit
枚举中的任意一个值,如TimeUnit.SECONDS
、TimeUnit.MILLISECONDS
等。
-
workQueue
(工作队列):- 用来保存等待执行的任务的队列。可以选择不同类型的阻塞队列,如
LinkedBlockingQueue
、SynchronousQueue
等。这决定了任务的排队策略。
- 用来保存等待执行的任务的队列。可以选择不同类型的阻塞队列,如
-
threadFactory
(线程工厂):- 用于创建新线程的工厂。通过提供自定义的
ThreadFactory
,可以定制线程的创建过程,比如设置线程名、线程优先级、是否是守护线程等。 - 一般用默认的
Executors.defaultThreadFactory()
- 用于创建新线程的工厂。通过提供自定义的
-
handler
(拒绝执行处理器):- 当线程池无法执行新的任务时,执行的处理策略。可以选择不同的处理策略
ThreadPoolExecutor.AbortPolicy
(抛出RejectedExecutionException
)ThreadPoolExecutor.DiscardPolicy
(丢弃任务但不抛出异常)ThreadPoolExecutor.CallerRunsPolicy
(由调用线程执行该任务。这种策略提供了一种退化机制,可以缓解资源饱和的情况。)ThreadPoolExecutor.DiscardOldestPolicy
(丢弃队列中最旧的未处理任务,然后重新提交被拒绝的任务)
- 当线程池无法执行新的任务时,执行的处理策略。可以选择不同的处理策略
所以使用
ThreadPoolExecutor
会更加灵活的定义线程池的参数
|
|
最大线程如何定义?
- cpu密集型:多少核就设定多少线程。
- IO密集型:IO占用,多少个任务就设定(2倍任务个数++)
CompletableFuture异步回调
没有返回值的异步回调
- 发起一个请求
CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{})
- 获取阻塞执行结果
completableFuture.get()
有返回值的异步回调
-
发起一个supply供应请求
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{})
-
正常返回信息和错误信息
-
1 2 3 4 5 6 7 8
System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t); // 正常的返回结果 System.out.println("u=>" + u); // 错误信息: // java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233; // 可以获取到错误的返回结果 }).get());
completableFuture.whenComplete((t, u) ->{})
完成时获得返回结果- t是正常的返回结果
- u是错误信息
.exceptionally((e) -> {}).get()
,出错后可以返回一个特定的值
-