Java并发编程
概述
JUC是java.util.concurrent工具包的简称,是处理多线程的工具包
wait方法和sleep方法区别
- wait是Object的方法,任何对象实例都可以调用。sleep是Thread的静态方法
- sleep不会释放锁,它也不需要占用锁。wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
- 它们都会被interrupted方法中断
管程
Monitor监视器,是一种同步机制,保证在同一时间,只有一个线程访问被保护的数据或者代码
jvm同步基于进入和退出,使用管程对象实现的
用户线程和守护线程
- 用户线程:自定义线程,主线程结束了,用户线程还存活,jvm存活
- 守护线程:比如垃圾回收,没有用户线程了,都是守护线程,jvm结束
1 2 3 4 5 6 7 8 9 10 11
| Thread thread=new Thread(()->{ System.out.println(Thread.currentThread().getName()+":::"+Thread.currentThread().isDaemon()); while(true){
} });
thread.setDaemon(true); thread.start(); System.out.println(Thread.currentThread().getName()+"::::主线程结束");
|
Lock接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class Ticket { private int number = 30;
public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "买到了::" + (number--) + "票,剩余:" + number); } } }
class LTicket { private int number = 30;
public final ReentrantLock lock = new ReentrantLock();
public void sale() { lock.lock(); try { if (number > 0) { System.out.println(Thread.currentThread().getName() + "买到了::" + (number--) + " 剩余:" + number); } } finally { lock.unlock(); } } }
|
线程间通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| class T { private int number = 0;
public synchronized void incr() throws InterruptedException { while (number != 0) { this.wait(); } number++; System.out.println(Thread.currentThread().getName() + " :: " + number); this.notifyAll(); }
public `synchronized`
void decr() throws InterruptedException { while (number != 1) { this.wait(); } number--; System.out.println(Thread.currentThread().getName() + " :: " + number); this.notifyAll(); } }
class T2 { private int number = 0;
private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition();
public void incr() throws InterruptedException { lock.lock(); try { while (number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + " :: " + number); condition.signalAll(); } finally { lock.unlock(); } }
public void decr() throws InterruptedException { lock.lock(); try { while (number != 1) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + " :: " + number); condition.signalAll(); } finally { lock.unlock(); } } }
|
线程间定制化通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| class Ticket { private int flag = 1;
private final Lock lock = new ReentrantLock(); private final Condition c1 = lock.newCondition(); private final Condition c2 = lock.newCondition(); private final Condition c3 = lock.newCondition();
public void print5(int loop) throws InterruptedException { lock.lock(); try { while (flag != 1) { c1.await(); } for (int i = 1; i <= 5; i++) { System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop); } flag = 2; c2.signalAll(); } finally { lock.unlock(); } }
public void print10(int loop) throws InterruptedException { lock.lock(); try { while (flag != 2) { c2.await(); } for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop); } flag = 3; c3.signalAll(); } finally { lock.unlock(); } }
public void print15(int loop) throws InterruptedException { lock.lock(); try { while (flag != 3) { c3.await(); } for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop); } flag = 1; c1.signalAll(); } finally { lock.unlock(); } } }
public class t1 { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(() -> { for (int i = 1; i <= 10; i++) { try { ticket.print5(i); } catch (InterruptedException e) { e.printStackTrace(); } } }, "AA").start(); new Thread(() -> { for (int i = 1; i <= 10; i++) { try { ticket.print10(i); } catch (InterruptedException e) { e.printStackTrace(); } } }, "BB").start();
new Thread(() -> { for (int i = 1; i <= 10; i++) { try { ticket.print15(i); } catch (InterruptedException e) { e.printStackTrace(); } } }, "CC").start(); } }
|
集合的线程安全
ArrayList和CopyOnWriteArrayList
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
List<String> list=new CopyOnWriteArrayList<>();
for(int i=0;i< 30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); }
|
HashSet和CopyOnWriteArraySet
1 2 3 4 5 6 7 8 9 10 11 12
|
Set<String> set=new CopyOnWriteArraySet<>();
for(int i=0;i< 30;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(set); },String.valueOf(i)).start(); }
|
HashMap和ConcurrentHashMap
1 2 3 4 5 6 7 8 9 10
|
Map<String, Object> map=new ConcurrentHashMap<>(); for(int i=0;i< 30;i++){ new Thread(()->{ map.put(UUID.randomUUID().toString().substring(0,8),UUID.randomUUID().toString().substring(0,8)); System.out.println(map); },String.valueOf(i)).start(); }
|
多线程锁
synchronized
synchronized实现同步的基础:Java中的每一个对象都可以作为锁
具体表现为:
- 对于普通方法,锁的是
当前实例对象
- 对于静态同步方法,锁的是
当前类的Class对象
- 对于同步方法块,锁的是
synchronized括号里配置的对象
公平锁和非公平锁
- 公平锁效率高,
会出现一个线程执行完所有代码
- 非公平锁效率相对低,
每个线程都有执行代码的机会
1 2 3 4
| private final Lock lock=new ReentrantLock(true);
private final Lock lock=new ReentrantLock();
|
可重入锁
获取到最外层的锁,内层所有代码都可以执行
synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13
| Object o = new Object(); new Thread(() -> { synchronized (o) { System.out.println("外层"); synchronized (o) { System.out.println("中层"); synchronized (o) { System.out.println("内层"); } } } }).start();
|
1 2 3 4 5 6 7
| public synchronized void add(){ add(); }
new Ticket().add();
|
lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Lock lock = new ReentrantLock(); new Thread(() -> { try { lock.lock(); System.out.println("外层"); try { lock.lock(); System.out.println("内层"); } finally { lock.unlock(); } } finally { lock.unlock(); } }).start();
|
死锁
两个或者两个以上进程在执行过程中,因为争夺资源而造成一种互相等待的现象,如果没有外力干涉,他们无法再执行下去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| Object a = new Object(); Object b = new Object();
new Thread(()->{ synchronized (a){ System.out.println("获得锁a"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b){ System.out.println("获得锁b"); } } },"A").start();
new Thread(()->{ synchronized (b){ System.out.println("获得锁b"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a){ System.out.println("获得锁a"); } } },"A").start(); }
|
产生死锁的原因
- 系统资源不足
- 进程运行推送顺序不合适
- 资源分配不当
Callable接口
创建的线程可以有返回值
Runnable接口和Callable接口
- 是否有返回值
- 是否抛出异常
- 实现方法名称不同,一个是run方法,一个是call方法
1 2 3 4 5 6
| FutureTask<Integer> task = new FutureTask<>(() -> { return 10; }); new Thread(task,"AA");
System.out.println(task.get());
|
JUC辅助类
CountDownLatch
CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句
1 2 3 4 5 6 7 8 9 10 11 12 13
| CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 执行完成"); countDownLatch.countDown(); }, String.valueOf(i)).start(); }
countDownLatch.await(); System.out.println("全部执行完成");
|
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{ System.out.println("当值达到6的时候执行"); });
for (int i = 0; i < 6; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 执行完成"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }, String.valueOf(i)).start(); }
|
Semaphore
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 抢到车位"); TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName() + " =======离开车位"); } catch (Exception e) { e.printStackTrace(); }finally { semaphore.release(); } }, String.valueOf(i)).start(); }
|
读写锁
一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public volatile Map<String, Object> map = new HashMap<>();
public ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object v) throws InterruptedException { rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " 正在写 " + key); TimeUnit.MILLISECONDS.sleep(300); map.put(key, v); System.out.println(Thread.currentThread().getName() + " 写完了 " + key); } finally { rwLock.writeLock().unlock(); } }
public void get(String key) throws InterruptedException { rwLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " 正在读 " + key); TimeUnit.MILLISECONDS.sleep(300); map.get(key); System.out.println(Thread.currentThread().getName() + " 读完了 " + key); } finally { rwLock.readLock().unlock(); } }
|
JUC阻塞队列
可以使得数据由队列的一端输入,从另一端输出
ArrayBlockingQueue
(常用):由数组结构组成的有界阻塞队列,基于数组的阻塞队列实现,在ArrayBlockingQueue内部维护了一个定长的数组,以便缓存队列中的数据对象,除了一个定长数组外,ArrayBlockingQueue内部保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置
LinkedBlockingQueue
(常用):由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列
DelayQueue
:使用优先级队列实现的延迟无界阻塞队列
PriorityBlockingQueue
:支持优先级排序的无界阻塞队列
SynchronousQueue
:不存储元素的阻塞队列,也即单个元素的队列
LinkedTransferQueue
:由链表组成的无界阻塞队列
LinkedBlockingDeque
:由链表组成的双向阻塞队列
方法类型 |
抛出异常 |
特殊值 |
阻塞 |
超时 |
插入 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
移除 |
remove() |
poll() |
take() |
poll(time,unit) |
检查 |
element() |
peek() |
不可用 |
不可用 |
ThreadPool线程池
- 使用Executors
- 使用new ThreadPoolExecutor
corePoolSize
:核心线程数(一直存在);线程池创建好以后就准备就绪的线程数量,用来等待接受异步任务去执行
maximumPoolSize
:最大线程数量
keepAliveTime
:存活时间。如果当前线程数量大于最大数量,当线程空闲了存活时间后释放空闲的线程
unit
:时间单位
BlockingQueue<Runnable> workQueue
:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就会去队列里面取出新的任务继续执行
ThreadFactory threadFactory
:线程的创建工厂
RejectedExecutionHandler handler
:如果队列满了,按照指定的策略拒绝执行任务
1 2 3 4 5 6 7 8 9 10
| Executors.newFixedThreadPool(10); Executors.newCachedThreadPool(); Executors.newScheduledThreadPool(10); Executors.newSingleThreadExecutor();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("consumer-queue-thread-%d").build(); ExecutorService service=new ThreadPoolExecutor(10, 100, 25L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), namedThreadFactory);
|
CompletableFuture
基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| CompletableFuture.runAsync(()->{ },service);
CompletableFuture.supplyAsync(()->{ return 100; },service);
CompletableFuture.supplyAsync(()->100) .whenComplete((res,exc)->{ System.out.println("结果是:"+res); System.out.println("异常是:"+exc); }) .exceptionally(t->{ return 10; });
CompletableFuture.supplyAsync(()->100) .handle((res,exc)->{ System.out.println("结果是:"+res); System.out.println("异常是:"+exc); if(res!=null){ return res; } if(exc!=null){ return 0; } return 10; });
|
线程串行化
thenApply()和thenApplyAsync()
方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值
thenAccept()和thenAcceptAsync()
方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
thenRun()和thenRunAsync()
方法:带有Async默认是异步执行的,不接收上一个任务返回值
1 2 3 4 5 6 7 8 9 10 11 12
| ExecutorService service=Executors.newSingleThreadExecutor(); CompletableFuture.supplyAsync(()->100) .thenAccept(res->{ System.out.println("上一个任务的返回值:"+res); }) .thenApplyAsync(res->{ System.out.println("上一个任务的返回值:"+res); return 100; },service) .thenRunAsync(()->{ System.out.println("无法接收返回值"); },service);
|
两任务组合-都要完成
thenCombine()和thenCombineAsync()
:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth()和thenAcceptBothAsync()
:组合两个future,获取两个future的返回结果,没有返回值
runAfterBoth()和runAfterBothAsync()
:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{ int i=10; return i/2; },service); CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{ int i=20; return i/2; },service);
future01.runAfterBothAsync(future02,()->{ System.out.println("任务1和任务2执行后执行"); },service);
future01.thenAcceptBothAsync(future02,(res01,res02)->{ System.out.println("任务1返回结果:"+res01); System.out.println("任务2返回结果:"+res02); },service);
future01.thenCombineAsync(future02,(res01,res02)->{ System.out.println("任务1返回结果:"+res01); System.out.println("任务2返回结果:"+res02); return"任务3自己的返回值"; },service);
|
两任务组合-一个完成
applyToEither()和applyToEitherAsync()
:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
acceptEither()和acceptEitherAsync()
:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
runAfterEither()和runAfterEitherAsync()
:两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{ int i=10; return i/2; },service); CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{ int i=20; return i/2; },service);
future01.runAfterEitherAsync(future02,()->{
},service);
future01.acceptEitherAsync(future02,res->{
System.out.println("任务返回结果:"+res); },service);
future01.applyToEitherAsync(future02,res->{
System.out.println("任务返回结果:"+res); return res+1; },service);
|
多任务组合
allOf()
:等待所有任务完成
anyOf()
:只要有一个任务完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{ int i=10; return i/2; },service); CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{ int i=20; return i/2; },service);
CompletableFuture<Void> allOf=CompletableFuture.allOf(future01,future02);
allOf.get();
CompletableFuture<Object> anyOf=CompletableFuture.anyOf(future01,future02);
anyOf.get();
|
相关文章
JAVA基础
JDBC
JAVA多线程