JavaSE 进阶(二十)

死锁

多个线程各自占有一些共享资源,并且互相等待别的线程占有的资源才能运行,这就出现了两个或多个线程都在等待对方释放资源,线程都停止执行的情形。

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package ml.guest997;

public class DeadLock {
public static void main(String[] args) {
new Thread(new Trade(0, "admin ")).start();
new Thread(new Trade(1, "guest ")).start();
}
}

//钱
class Money {

}

//货
class Goods {

}

class Trade implements Runnable {

//用 static 来保证资源只有一份,
static Money money = new Money();
static Goods goods = new Goods();
int choice; //选择
String person; //交易的人

public Trade(int choice, String person) {
this.choice = choice;
this.person = person;
}

@Override
public void run() {
try {
if (choice == 0) {
synchronized (money) {
System.out.println(this.person + "获得钱");
Thread.sleep(1000); //加个线程休眠是怕当前线程一下子就把两个锁都拿走了。
synchronized (goods) {
System.out.println(this.person + "获得货");
}
}
} else {
synchronized (goods) {
System.out.println(this.person + "获得货");
Thread.sleep(2000);
synchronized (money) {
System.out.println(this.person + "获得钱");
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*结果为
admin 获得钱
guest 获得货
*/

运行上面的代码后会发现程序卡住了,两个人都想要获取对方锁住的资源,然而谁都没有想要释放自己的锁,导致两个锁一直无法释放,程序自然就无法继续运行下去了。

改造案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void run() {
try {
if (choice == 0) {
synchronized (money) {
System.out.println(this.person + "获得钱");
Thread.sleep(1000);
}
synchronized (goods) {
System.out.println(this.person + "获得货");
}
} else {
synchronized (goods) {
System.out.println(this.person + "获得货");
Thread.sleep(2000);
}
synchronized (money) {
System.out.println(this.person + "获得钱");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*结果为
admin 获得钱
guest 获得货
admin 获得货
guest 获得钱
*/

只要不让任何一个线程同时拥有两个锁即可,当运行完自己的同步代码块后就会释放锁,这样程序就能继续运行下去了。

避免方法

产生死锁的四个必要条件:

  • 互斥条件:一个资源每次只能被一个进程使用。
  • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  • 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
  • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

只要想办法破坏其中的任意一个或多个条件,就可以避免死锁发生。

Lock 锁

java.util.concurrent.locks.Lock 接口是控制多个线程对共享资源进行访问的工具。锁提供了对共享资源的独占访问,每次只能有一个线程对 Lock 对象加锁,线程开始访问共享资源之前应先获得 Lock 对象。
比较常用的是 Reentrantlock 类,它实现了 Lock 接口,拥有与 synchronized 相同的并发性和内存语义,在实现线程安全的控制中,可以显式加锁和解锁。

买票案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package ml.guest997;

import java.util.concurrent.locks.ReentrantLock;

public class LockTest implements Runnable {
private int tickets = 10;
private final ReentrantLock lock = new ReentrantLock(); //定义 Lock 锁,ReentrantLock 是一个可重入锁。

@Override
public void run() {
try {
while (true) {
lock.lock(); //加锁
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + "拿到了第" + tickets-- + "张票");
Thread.sleep(1000); //模拟延时能够扩大问题的发生性
lock.unlock(); //解锁
continue;
}
lock.unlock(); //解锁
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
LockTest lockTest = new LockTest();
new Thread(lockTest, "01").start();
new Thread(lockTest, "02").start();
new Thread(lockTest, "03").start();
}

}
/*结果为
01拿到了第10张票
01拿到了第9张票
01拿到了第8张票
01拿到了第7张票
01拿到了第6张票
02拿到了第5张票
02拿到了第4张票
02拿到了第3张票
02拿到了第2张票
03拿到了第1张票
*/

使用 Lock 锁,JVM 将花费较少的时间来调度线程,性能更好,并且具有更好的扩展性(提供更多的子类)。

线程通信(生产者消费者案例)

对于生产者,没有生产产品之前,要通知消费者等待。而生产了产品之后,又需要马上通知消费者消费。
对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。

这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。

在生产者消费者问题中,仅有 synchronized 是不够的。synchronized 可阻止并发更新同一个共享资源,实现了同步。
但 synchronized 不能用来实现不同线程之间的消息传递(通信)。

方法

方法名

作用

wait()

表示线程一直等待,直到其它线程通知,与 sleep 不同,会释放锁。

notify()

唤醒一个处于等待状态的线程。

notifyAll()

唤醒同个对象上所有调用 wait 方法的线程,优先级别高的线程优先调度。

管程法

pc

  • 生产者:负责生产数据的模块(可能是方法、对象、线程或进程)
  • 消费者:负责处理数据的模块(可能是方法、对象、线程或进程)
  • 缓存区:消费者不能直接使用生产者的数据,生产者将生产好的数据放入缓存区,消费者从缓存区拿出数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package ml.guest997;

public class CommunicationTest01 {
public static void main(String[] args) throws InterruptedException {
CacheArea cacheArea = new CacheArea();
new Thread(new Producer(cacheArea), "生产者").start();
Thread.sleep(3000); //配合下面生产者的循环,模拟缓存区爆满的情况。
new Thread(new Consumer(cacheArea), "消费者").start();
}
}

class Product {
private int id;

Product(int id) {
this.id = id;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}
}

class Producer implements Runnable {
private CacheArea cacheArea;

Producer(CacheArea cacheArea) {
this.cacheArea = cacheArea;
}

@Override
public void run() {
for (int i = 1; i <= 11; i++) { //模拟缓存区产品爆满的情况
cacheArea.push(new Product(i));
try {
Thread.sleep(100); //生产延时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Consumer implements Runnable {
private CacheArea cacheArea;

Consumer(CacheArea cacheArea) {
this.cacheArea = cacheArea;
}

@Override
public void run() {
for (int i = 1; i <= 11; i++) { //模拟缓存区产品不足的情况
cacheArea.pop();
try {
Thread.sleep(100); //消费延时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class CacheArea {
//容器大小
Product[] products = new Product[10];
//容器计数器
int count = 0;

//生产者放入产品
public synchronized void push(Product product) {
if (count == products.length) { //如果容器满了,需要等待消费者消费。
try {
System.out.println("缓存区已经满了");
this.wait(1000); //当前线程等待超过1秒时则被唤醒继续执行下面的代码
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//缓存区产品爆满时的处理
int flag = count + 1;
if (flag > products.length) {
System.out.println("由于指定时间内始终无消费者进行消费,等待超时,所以停止生产。");
return;
}
//放入产品
products[count] = product;
count++;
System.out.println("生产了第" + product.getId() + "个产品");
this.notifyAll(); //当前线程通知消费者消费
}

//消费者取出产品
public synchronized void pop() {
if (count == 0) { //如果容器空了,需要等待生产者生产。
try {
System.out.println("缓存区已经空了");
this.wait(1000); //当前线程等待超过1秒时则被唤醒继续执行下面的代码
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//缓存区产品不足时的处理
int flag = count - 1;
if (flag < 0) {
System.out.println("由于指定时间内始终无生产者进行生产,等待超时,判断生产者已停止生产,请下次再来。");
return;
}
//取出产品
count--;
Product product = products[count];
System.out.println("消费了第" + product.getId() + "个产品");
this.notifyAll(); //当前线程通知生产者生产
}
}
/*结果为
生产了第1个产品
生产了第2个产品
生产了第3个产品
生产了第4个产品
生产了第5个产品
生产了第6个产品
生产了第7个产品
生产了第8个产品
生产了第9个产品
生产了第10个产品
缓存区已经满了
由于指定时间内始终无消费者进行消费,等待超时,所以停止生产。
消费了第10个产品
消费了第9个产品
消费了第8个产品
消费了第7个产品
消费了第6个产品
消费了第5个产品
消费了第4个产品
消费了第3个产品
消费了第2个产品
消费了第1个产品
缓存区已经空了
由于指定时间内始终无生产者进行生产,等待超时,判断生产者已停止生产,请下次再来。
*/

注意:同步方法要放在缓存区里,否则加的锁在两个线程中没有交点是不会相互唤醒的。

信号灯法

顾名思义,信号灯是用来在生产者与消费者之间传递信号的一个旗帜。当生产者或消费者线程完成自己的工作,等待另一个线程进行时,便会将信号值修改用以告诉另一者:我的事情做完了,该你了。而另一者获取信号的变化后便会做出对应的行为。在这个过程中,信号值一直被反复更改,直到所有线程均执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package ml.guest997;

public class CommunicationTest02 {
public static void main(String[] args) throws InterruptedException {
Pc pc = new Pc();
new Thread(new Pd(pc),"生产者").start();
new Thread(new Cs(pc),"消费者").start();
}
}

//产品
class Pc {
boolean flag = true;

public synchronized void push() {
if (!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("产品生产出来了,可消费。");
flag = !flag;
this.notifyAll();
}

public synchronized void pop() {
if (flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("产品消费完了,请生产。");
flag = !flag;
this.notifyAll();
}
}

//生产者
class Pd implements Runnable {
private Pc pc;

Pd(Pc pc) {
this.pc = pc;
}

@Override
public void run() {
while (true) {
pc.push();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

//消费者
class Cs implements Runnable {
private Pc pc;

Cs(Pc pc) {
this.pc = pc;
}

@Override
public void run() {
while (true) {
pc.pop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/*结果为
产品生产出来了,可消费。
产品消费完了,请生产。
产品生产出来了,可消费。
产品消费完了,请生产。
产品生产出来了,可消费。
产品消费完了,请生产。
产品生产出来了,可消费。
产品消费完了,请生产。
产品生产出来了,可消费。
产品消费完了,请生产。
...
*/

线程池

经常创建或销毀使用量特别大的资源,比如并发情况下的线程,对性能影响很大。解决方法就是提前创建好多个线程,放入线程池中,使用时直接获取,使用完后放回池中。可以避免频繁创建和销毁线程,实现重复利用。

好处:

  • 降低资源的消耗。线程本身是一种资源,创建和销毁线程会有 CPU 开销;创建的线程也会占用一定的内存。
  • 提高任务执行的响应速度。任务执行时,可以不必等到线程创建完之后再执行。
  • 提高线程的可管理性。线程不能无限制地创建,需要进行统一的分配、调优和监控。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package ml.guest997;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PoolTest {
public static void main(String[] args) {
//Executors:线程池的工厂类,ExecutorService:线程池接口
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
//submit() 中的参数为 Runnable 或 Callable 实例对象
executor.submit(() -> {
System.out.println("thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown(); //关闭连接
}
}