【Java JUC】Java多线程并发编程零基础小白入门(下)

前言

在被面试官疯狂打击后,我决定重新系统学习JUC编程,也就是并发编程。本篇文章作为Java语言多线程并发编程的入门级别,仅从学会使用的角度学习,并不深入涉及有关知识的底层实现原理,比如volatile关键字、AtomicInteger、CountDownLatch、Semaphore、CyclicBarrier、Exchanger等。文章会穿插一些面试会问到的面试题。

上篇文章看这里:【Java JUC】Java多线程并发编程零基础小白入门(中) (imyjs.cn)

5、原子性

概述:所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行。

原子性案例

public class VolatileAtomicThreadDemo {
    public static void main(String[] args) {
        // 创建VolatileAtomicThread对象
        Thread thread = new Thread(new RunnableDemo());

        // 开启100个线程对count进行++操作
        for(int x = 0 ; x < 100 ; x++) {
            new Thread(thread).start();
        }
    }
}

class RunnableDemo implements Runnable {

    // 定义一个int类型的遍历
    private int count = 0;

    @Override
    public void run() {
        // 对该变量进行++操作,100次
        for (int x = 0; x < 100; x++) {
            count++;
            System.out.println("count =========>>>> " + count);
        }
    }
}

// 9999

 

 

执行结果:不保证一定是10000

问题分析

以上问题主要是发生在count++操作上:

count++操作包含3个步骤:

  • 从主内存中读取数据到工作内存

  • 对工作内存中的数据进行++操作

  • 将工作内存中的数据写回到主内存

count++操作不是一个原子性操作,也就是说在某一个时刻对某一个操作的执行,有可能被其他的线程打断。

1)假设此时x的值是100,线程A需要对该变量进行自增1的操作,首先它需要从主内存中读取变量x的值。由于CPU的切换关系,此时CPU的执行权被切换到了B线程。A线程就处于就绪状态,B线程处于运行状态。

2)线程B也需要从主内存中读取x变量的值,由于线程A没有对x值做任何修改因此此时B读取到的数据还是100。

3)线程B工作内存中x执行了+1操作,但是未刷新之主内存中。

4)此时CPU的执行权切换到了A线程上,由于此时线程B没有将工作内存中的数据刷新到主内存,因此A线程工作内存中的变量值还是100,没有失效。A线程对工作内存中的数据进行了+1操作。

5)线程B将101写入到主内存

6)线程A将101写入到主内存

虽然计算了2次,但是只对A进行了1次修改。

volatile原子性测试

// 定义一个int类型的遍历
private volatile int count = 0;

 

 

在多线程环境下,volatile关键字可以保证共享数据的可见性,但是并不能保证对数据操作的原子性(在多线程环境下volatile修饰的变量也是线程不安全的)。

在多线程环境下,要保证数据的安全性,我们还需要使用锁机制。

volatile的使用场景

  • 开关控制

    利用可见性特点,控制某一段代码执行或者关闭。

  • 多个线程操作共享变量,但是是有一个线程对其进行写操作,其他的线程都是读。

volatile无法保证原子性

刚才变量内存可见性的例子中,我们看到,使用volatile和synchronized锁都可以保证共享变量的可见性。相t synchronized而言,volatile可以看作是一个轻量级锁,所以使用volatile的成本更低,因为它不会引起线程上下文的切换和调度。但volatile无法像synchronized一样保证操作的原子性。

在多线程环境下,volatile关键字可以保证共享数据的可见性,但是并不能保证对数据操作的原子性。也就是说,多线程环境下,使用volatilex修饰的变量是线程不安全的。要解决这个问题,我们可以使用锁机制,或者使用原子类(如 Atomiclnteger)。这里特别说一下,对任意单个使用volatile修饰的变量的读/写是具有原子性,但类似于flag = !flag 这种复合操作不具有原子性。简单地说就是,单纯的赋值操作是原子性的。

问题解决

使用锁机制

我们可以给count++操作添加锁,那么count++操作就是临界区的代码,临界区只能有一个线程去执行,所以count++就变成了原子操作。

synchronized (this){
    count++;
}

 

 

但是synchronized比较重,性能比较慢,代码写起来也复杂。 由此Java引入了原子类型包,原子类底层是CAS,无需手动加锁即可实现锁的效果,相当于乐观锁。

原子类

概述:java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。

AtomicInteger

原子型Integer,可以实现原子更新操作

public AtomicInteger():	   				初始化一个默认值为0的原子型Integer
public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer

int get():   			 				 获取值
int getAndIncrement():      			 以原子方式将当前值加1,注意,这里返回的是自增前的值。
int incrementAndGet():     				 以原子方式将当前值加1,注意,这里返回的是自增后的值。
int addAndGet(int data):				 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
int getAndSet(int value):   			 以原子方式设置为newValue的值,并返回旧值。

 

 

演示基本使用:

无需声明volatile变量

无需对方法加synchronized

class RunnableDemo implements Runnable {

    // 定义一个int类型的遍历
    // private volatile int count = 0;
    private AtomicInteger atomicInteger = new AtomicInteger() ;

    @Override
    public void run() {
        // 对该变量进行++操作,100次
        for (int x = 0; x < 100; x++) {
            System.out.println("count =========>>>> " + atomicInteger.incrementAndGet());
        }
    }
}

 

 

原子类CAS机制

CAS的全成是: Compare And Swap(比较再交换); 是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。CAS可以将read-modify-check-write转换为原子操作,这个原子操作直接由处理器保证。

CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。

举例:

  1. 在内存地址V当中,存储着值为10的变量。

  2. 此时线程1想要把变量的值增加1。对线程1来说,旧的预期值A=10,要修改的新值B=11。

  3. 在线程1要提交更新之前,另一个线程2抢先一步,把内存地址V中的变量值率先更新成了11。

  4. 线程1开始提交更新,首先进行A和地址V的实际值比较(Compare),发现A不等于V的实际值,提交失败。

  5. 线程1重新获取内存地址V的当前值,并重新计算想要修改的新值。此时对线程1来说,A=11,B=12。这个重新尝试的过程被称为自旋。

  6. 这一次比较幸运,没有其他线程改变地址V的值。线程1进行Compare,发现A和地址V的实际值是相等的。

  7. 线程1进行SWAP,把地址V的值替换为B,也就是12。

CAS与Synchronized

CAS和Synchronized都可以保证多线程环境下共享数据的安全性。那么他们两者有什么区别?在说他们的区别前,先来看看什么是悲观锁和乐观锁。

  • 悲观锁 对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,也就是会有其他线程来与自己竞争,所以必须对每次数据操作加上锁,以保证临界区的程序同一时间只能有一个线程在执行。

  • 乐观锁 乐观锁又称为“无锁”,顾名思义,它是乐观派。乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说乐观锁天生免疫死锁。

乐观锁多用于“读多写少“的环境,避免频繁加锁影响性能;而悲观锁多用于"写多读少"的环境,避免频繁失败和重试影响性能。

Synchronized是从悲观的角度出发(悲观锁)

总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。因此Synchronized我们也将其称之为悲观锁。jdk中的ReentrantLock也是一种悲观锁。性能较差!!

CAS是从乐观的角度出发:

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。

6、并发包

ConcurrentHashMap

为什么要使用ConcurrentHashMap:

  1. HashMap线程不安全,会导致数据错乱

  2. 使用线程安全的Hashtable效率低下

基于以上两个原因,便有了ConcurrentHashMap的登场机会。

HashMap线程不安全演示

public class HashMap_ {
    public static HashMap<String, Integer> map = new HashMap<>();

    public static void main(String[] args) {
        HashMapThread thread = new HashMapThread("线程一");
        HashMapThread thread1 = new HashMapThread("线程二");
        thread1.start();
        thread.start();
        try {
            thread.join();
            thread1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Map元素个数:" + map.size());
    }
}

class HashMapThread extends Thread{

    HashMapThread(String name){
        super(name);
    }
    @Override
    public void run() {
        for (int i = 0; i < 5000; i++) {
            HashMap_.map.put(getName() + i, i+1);
        }
    }
}

 

说明:两个线程分别向同一个map中写入5000个键值对,最后map的size应为:10000,但多运行几次会发现有以下几种错误:

  • 数据不正确

  • 异常Exception in thread "线程一" java.lang.ClassCastException: java.util.HashMap$Node cannot be cast to java.util.HashMap$TreeNode

  • 其他

总之线程不安全.....

为了保证线程安全,可以使用Hashtable。

public static Hashtable<String, Integer> map = new Hashtable();

HashTable虽然能够保证线程安全,但是效率低下。效率低下原因:

public synchronized V put(K key, V value) 
public synchronized V get(Object key)

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

在多线程并发环境下,推荐使用ConcurrentHashMap,既安全又高效。

public static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap();

面试题:ConcurrentHashMap 和 Hashtable 的区别

ConcurrentHashMapHashtable 的区别主要体现在实现线程安全的方式上不同。

  • 底层数据结构: JDK1.7 的 ConcurrentHashMap 底层采用 分段的数组+链表 实现,JDK1.8 采用的数据结构跟 HashMap1.8 的结构一样,数组+链表/红黑二叉树。Hashtable 和 JDK1.8 之前的 HashMap 的底层数据结构类似都是采用 数组+链表 的形式,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的;

  • 实现线程安全的方式:

    • 在 JDK1.7 的时候,ConcurrentHashMap(分段锁) 对整个桶数组进行了分割分段(Segment),每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。 到了 JDK1.8 的时候已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。(JDK1.6 以后 对 synchronized 锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本;

    • Hashtable(同一把锁) :使用 synchronized 来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低。

面试题:ConcurrentHashMap 线程安全的具体实现方式/底层具体实现

JDK1.7

首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成

Segment 实现了 ReentrantLock,所以 Segment 是一种可重入锁,扮演锁的角色。HashEntry 用于存储键值对数据。

 static class Segment<K,V> extends ReentrantLock implements Serializable {
 }

一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 的锁。

JDK1.8

ConcurrentHashMap 取消了 Segment 分段锁,采用 CAS 和 synchronized 来保证并发安全。数据结构跟 HashMap1.8 的结构类似,数组+链表/红黑二叉树。Java 8 在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(log(N)))

synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,效率又提升 N 倍。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作,再执行自己。

CountDownLatch构造方法:

public CountDownLatch(int count)// 初始化一个指定计数器的CountDownLatch对象

CountDownLatch重要方法:

public void await() throws InterruptedException// 让当前线程等待
public void countDown()	// 计数器进行减1

案例

线程1要执行打印:A和C,线程2要执行打印:B,但线程1在打印A后,要线程2打印B之后才能打印C,所以:线程1在打印A后,必须等待线程2打印完B之后才能继续执行。

代码实现

public class CountDownLatch_ {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new ThreadA("线程A", countDownLatch).start();
        new ThreadB("线程B", countDownLatch).start();
    }
}

class ThreadA extends Thread{

    private CountDownLatch countDownLatch;
    public ThreadA(String name, CountDownLatch countDownLatch){
        super(name);
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println(getName() + "--->A");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(getName() + "--->C");
    }
}

class ThreadB extends Thread{

    private CountDownLatch countDownLatch;
    public ThreadB(String name, CountDownLatch countDownLatch){
        super(name);
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println(getName() + "--->B");
        countDownLatch.countDown();
    }
}

 

执行结果: 会保证按:A B C的顺序打印。

CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓,似乎有一点“三二一,芝麻开门”的感觉。

CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,可以调用countDown()方法让计数器-1,当计数器到达0时,调用CountDownLatch。

await()方法的线程阻塞状态解除,继续执行。

CyclicBarrier

概述

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier构造方法:

public CyclicBarrier(int parties, Runnable barrierAction)
// 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景

CyclicBarrier重要方法:

public int await()// 每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞

案例

公司召集5名员工开会,等5名员工都到了,会议开始。

我们创建5个员工线程,1个开会线程,几乎同时启动,使用CyclicBarrier保证5名员工线程全部执行后,再执行开会线程。

实现代码:

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new MeetingThread());
        new Thread(new EmpRunnable(cyclicBarrier)).start();
        new Thread(new EmpRunnable(cyclicBarrier)).start();
        new Thread(new EmpRunnable(cyclicBarrier)).start();
        new Thread(new EmpRunnable(cyclicBarrier)).start();
        new Thread(new EmpRunnable(cyclicBarrier)).start();
    }

}

class EmpRunnable implements Runnable{

    private CyclicBarrier cbRef;
    public EmpRunnable(CyclicBarrier cbRef) {
        this.cbRef = cbRef;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "来参加会议.....");
            cbRef.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

class MeetingThread extends Thread{
    @Override
    public void run() {
        System.out.println("开始开会.....");
    }
}

执行结果:

Thread-3来参加会议..... Thread-5来参加会议..... Thread-2来参加会议..... Thread-4来参加会议..... Thread-1来参加会议..... 开始开会.....

使用场景

使用场景:CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

需求:使用两个线程读取2个文件中的数据,当两个文件中的数据都读取完毕以后,进行数据的汇总操作。

Semaphore

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

synchronized可以起到"锁"的作用,但某个时间段内,只能有一个线程允许执行。Semaphore可以设置同时允许几个线程执行。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。

  • 访问资源后,使用release释放许可。

Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。 假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

Semaphore构造方法:

public Semaphore(int permits)						permits 表示许可线程的数量
public Semaphore(int permits, boolean fair)			fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

Semaphore重要方法:

public void acquire() throws InterruptedException	表示获取许可
public void release()								表示释放许可

示例一:同时允许1个线程执行

public class Service {
    //1表示许可的意思,表示最多允许1个线程执行acquire()和release()之间的内容
    private final Semaphore semaphore = new Semaphore(1);

    public void testMethod()  {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "进入时间:" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName() + "正在执行Service....");
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + "退出时间:" + System.currentTimeMillis());
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

class TestThread extends Thread{
    Service service;

    public TestThread(String name, Service service){
        super(name);
        this.service = service;
    }

    @Override
    public void run() {
        service.testMethod();
    }
}

class Mian{
    public static void main(String[] args) {
        Service service = new Service();
        for (int i = 1; i < 6; i++) {
            new TestThread("线程" + i, service).start();
        }
    }
}

结果:

 线程2进入时间:1662816776168
 线程2正在执行Service....
 线程2退出时间:1662816778179
 线程3进入时间:1662816778179
 线程3正在执行Service....
 线程3退出时间:1662816780185
 线程4进入时间:1662816780185
 线程4正在执行Service....
 线程4退出时间:1662816782186
 线程1进入时间:1662816782186
 线程1正在执行Service....
 线程1退出时间:1662816784201
 线程5进入时间:1662816784201
 线程5正在执行Service....
 线程5退出时间:1662816786211
 ​
 Process finished with exit code 0

示例二:同时允许2个线程同时执行

修改Service类,将new Semaphore(1)改为2即可:

public class Service {
    private Semaphore semaphore = new Semaphore(2);//2表示许可的意思,表示最多允许2个线程执行acquire()和release()之间的内容
    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()
                    + " 进入 时间=" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName()
                    + "   结束 时间=" + System.currentTimeMillis());
            semaphore.release();
			//acquire()和release()方法之间的代码为"同步代码"
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

再次执行结果:

线程1进入时间:1662816870101
线程2进入时间:1662816870101
线程2正在执行Service....
线程1正在执行Service....
线程1退出时间:1662816872103
线程2退出时间:1662816872103
线程3进入时间:1662816872103
线程4进入时间:1662816872103
线程4正在执行Service....
线程3正在执行Service....
线程4退出时间:1662816874108
线程3退出时间:1662816874108
线程5进入时间:1662816874108
线程5正在执行Service....
线程5退出时间:1662816876114

Process finished with exit code 0

Exchanger

概述

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。

这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

Exchanger构造方法:

 public Exchanger()

Exchanger重要方法:

 public V exchange(V x)

示例一:exchange方法的阻塞特性

public class ThreadA extends Thread{
    private Exchanger<String> exchanger;
    public ThreadA(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        try {
            System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值...");
            System.out.println("在线程A中得到线程B的值=" + exchanger.exchange("礼物A"));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Test {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<String>();
        ThreadA a = new ThreadA(exchanger);
        a.start();
    }
}


执行结果:
// 线程A欲传递值'礼物A'给线程B,并等待线程B的值...
// [线程A一直等待.....进入阻塞] 

示例二:exchange方法执行交换

public class ThreadA extends Thread{
    private Exchanger<String> exchanger;
    public ThreadA(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        try {
            System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值...");
            System.out.println("在线程A中得到线程B的值=" + exchanger.exchange("礼物A"));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


class ThreadB extends Thread{
    private Exchanger<String> exchanger;
    public ThreadB(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        try {
            System.out.println("线程B欲传递值'礼物B'给线程A,并等待线程A的值...");
            System.out.println("在线程B中得到线程A的值=" + exchanger.exchange("礼物B"));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Test {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<String>();
        ThreadA a = new ThreadA(exchanger);
        a.start();

        ThreadB b = new ThreadB(exchanger);
        b.start();
    }
}

执行结果:
// 线程B欲传递值'礼物B'给线程A,并等待线程A的值...
// 线程A欲传递值'礼物A'给线程B,并等待线程B的值...
// 在线程A中得到线程B的值=礼物B
// 在线程B中得到线程A的值=礼物A

示例三:exchange方法的超时

public class ThreadA extends Thread {
	private Exchanger<String> exchanger;
	public ThreadA(Exchanger<String> exchanger) {
		super();
		this.exchanger = exchanger;
	}
	@Override
	public void run() {
		try {
			System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值,只等5秒...");
			System.out.println("在线程A中得到线程B的值 =" + exchanger.exchange("礼物A",5, TimeUnit.SECONDS));
			System.out.println("线程A结束!");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			System.out.println("5秒钟没等到线程B的值,线程A结束!");
		}
	}
}

使用场景

使用场景:可以做数据校对工作

需求:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水。为了避免错误,采用AB岗两人进行录入,录入到两个文件中,系统需要加载这两个文件,并对两个文件数据进行校对,看看是否录入一致。

7、死锁

死锁的定义

多线程以及多进程改善了系统资源的利用率并提高了系统 的处理能力。然而,并发执行也带来了新的问题——死锁

所谓死锁是指多个线程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法向前推进

死锁产生的原因

1、系统资源的竞争

通常系统中拥有的不可剥夺资源,其数量不足以满足多个进程运行的需要,使得进程在运行过程中,会因争夺资源而陷入僵局,如磁带机、打印机等。只有对不可剥夺资源的竞争才可能产生死锁,对可剥夺资源的竞争是不会引起死锁的。

2、进程推进顺序非法

进程在运行过程中,请求和释放资源的顺序不当,也同样会导致死锁。例如,并发进程 P1、P2分别保持了资源R1、R2,而进程P1申请资源R2,进程P2申请资源R1时,两者都会因为所需资源被占用而阻塞。

Java中死锁最简单的情况是,一个线程T1持有锁L1并且申请获得锁L2,而另一个线程T2持有锁L2并且申请获得锁L1,因为默认的锁申请操作都是阻塞的,所以线程T1和T2永远被阻塞了。导致了死锁。这是最容易理解也是最简单的死锁的形式。

但是实际环境中的死锁往往比这个复杂的多。可能会有多个线程形成了一个死锁的环路,比如:线程T1持有锁L1并且申请获得锁L2,而线程T2持有锁L2并且申请获得锁L3,而线程T3持有锁L3并且申请获得锁L1,这样导致了一个锁依赖的环路:T1依赖T2的锁L2,T2依赖T3的锁L3,而T3依赖T1的锁L1。从而导致了死锁。

从上面两个例子中,我们可以得出结论,产生死锁可能性的最根本原因是:线程在获得一个锁L1的情况下再去申请另外一个锁L2,也就是说在获得了锁L1,并且没有释放锁L1的情况下,又去申请获得锁L2,这个是产生死锁的最根本原因。另一个原因是默认的锁申请操作是阻塞的

死锁产生的必要条件

产生死锁必须同时满足以下四个条件,只要其中任一条件不成立,死锁就不会发生。

(1)互斥条件:进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。

(2)不剥夺条件:进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。

(3)请求和保持条件:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。

(4)循环等待条件:存在一种进程资源的循环等待链,链中每一个进程已获得的资源同时被链中下一个进程所请求。即存在一个处于等待状态的进程集合{Pl, P2, ..., pn},其中Pi等 待的资源被P(i+1)占有(i=0, 1, ..., n-1),Pn等待的资源被P0占有。

死锁案例

public class Deadlock {
    public static void main(String[] args) {
        MyRunnable mr = new MyRunnable();

        new Thread(mr).start();
        new Thread(mr).start();
    }
}

class MyRunnable implements Runnable {
    Object lockA = new Object();
    Object lockB = new Object();

    @Override
    public void run() {
        synchronized (lockA) {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "获得了lockA,需要lockB");
            synchronized (lockB) {
                System.out.println(Thread.currentThread().getName() + "获得了lockB,执行完成");
            }
        }

        synchronized (lockB) {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "获得了lockB,需要lockA");
            synchronized (lockA) {
                System.out.println(Thread.currentThread().getName() + "获得了lockA,执行完成");
            }
        }
    }
}

 

如何避免死锁

在有些情况下死锁是可以避免的。下面介绍三种用于避免死锁的技术:

  • 加锁顺序(线程按照一定的顺序加锁)

    当多个线程需要相同的一些锁,但是按照不同的顺序加锁,死锁就很容易发生。如果能确保所有的线程都是按照相同的顺序获得锁,那么死锁就不会发生。

  • 加锁时限(线程尝试获取锁的时候加上一定的时限,超过时限则放弃对该锁的请求,并释放自己占有的锁)

    在尝试获取锁的时候加一个超时时间,这也就意味着在尝试获取锁的过程中若超过了这个时限该线程则放弃对该锁请求。若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。这段随机的等待时间让其它线程有机会尝试获取相同的这些锁,并且让该应用在没有获得锁的时候可以继续运行(加锁超时后可以先继续运行干点其它事情,再回头来重复之前加锁的逻辑)。

  • 死锁检测

    每当一个线程获得了锁,会在线程和锁相关的数据结构中(map、graph等等)将其记下。除此之外,每当有线程请求锁,也需要记录在这个数据结构中。当一个线程请求锁失败时,这个线程可以遍历锁的关系图看看是否有死锁发生。例如,线程A请求锁7,但是锁7这个时候被线程B持有,这时线程A就可以检查一下线程B是否已经请求了线程A当前所持有的锁。如果线程B确实有这样的请求,那么就是发生了死锁(线程A拥有锁1,请求锁7;线程B拥有锁7,请求锁1)。

    那么当检测出死锁时,这些线程该做些什么呢?

    一个可行的做法是释放所有锁,回退,并且等待一段随机的时间后重试。这个和简单的加锁超时类似,不一样的是只有死锁已经发生了才回退,而不会是因为加锁的请求超时了。虽然有回退和等待,但是如果有大量的线程竞争同一批锁,它们还是会重复地死锁(原因同超时类似,不能从根本上减轻竞争)。

    一个更好的方案是给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。如果赋予这些线程的优先级是固定不变的,同一批线程总是会拥有更高的优先级。为避免这个问题,可以在死锁发生的时候设置随机的优先级。

面试题:简单说一下同步、异步、阻塞、非阻塞的概念

同步:当一个同步调用发出后,调用者要一直等待返回结果。通知后,才能进行后续的执行。

异步:当一个异步过程调用发出后,调用者不能立刻得到返回结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

阻塞:是指调用结果返回前,当前线程会被挂起,即阻塞。

非阻塞:是指即使调用结果没返回,也不会阻塞当前线程。

面试题:线程死锁是如何产生的,如何避免?

所谓死锁就是由于两个或两个以上的线程相互竞争对方的资源,而同时不释放自己的资源,导致所有线程同时被阻塞。 死锁产生的条件:

  • 互斥条件:一个资源在同一时刻只由一个线程占用。也就是多个线程在同一时刻访问的同一资源必须是互斥资源。

  • 请求与保持条件:一个线程在请求被占资源时发生阻塞,并对已获得的资源保持不放。

  • 循环等待条件:发生死锁时,所有的线程会形成一个死循环,一直阻塞。

  • 不剥夺条件:线程已获得的资源在未使用完不能被其他线程剥夺,只能由自己使用完释放资源。

避免死锁的方法主要是破坏死锁产生的条件。

  • 破坏互斥条件:这个条件无法进行破坏,锁的作用就是使他们互斥。

  • 破坏请求与保持条件:一次性申请所有的资源。

  • 破坏循环等待条件:按顺序来申请资源。

  • 破坏不剥夺条件:线程在申请不到所需资源时,主动放弃所持有的资源。

微信关注

                   编程那点事儿

阅读剩余
THE END