重温生产者和消费者模型

生产者和消费者模型,主要解决的是数据的同步问题,生产者将数据放置一个存储区域,然后消费者过来取数据。这种模式类似于一个中间件,可以使得生产者不需要关心消费者什么时候来拿数据,同时在这种模式下,还可以控制两边的处理速率,避免数据的丢失。

下面以 Java 为例,来写一个生产者和消费者模型。

当队列满了的时候,生产者自己进行阻塞。而当消费者发现队列为空,则将自己阻塞。

所以要实现这个生产者消费者模型,首先必须有以下条件:

  1. 生产者或者消费者必须支持可阻塞
  2. 在多线程的情况下,必须保证并发安全(即插入不能产生数据错误),取数据不可以重复取

阻塞队列

Java 中,常用的阻塞队列有 LinkedBlockingQueue 或者 ArrayBlockingQueue,这两个阻塞队列的实现都是基于 ReentrantLock ,通过可重入锁来控制并发情况下的插入操作。

所以,如下便是在 Java 中的生产者和消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class BlockModel {

static ThreadPoolExecutor product = new ThreadPoolExecutor(5,10,1000, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100));

static ThreadPoolExecutor consumer = new ThreadPoolExecutor(5,10,1000, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100));


public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10);
for(int i =0 ;i< 5 ;i++){
product.submit(new Producer(100,linkedBlockingQueue,i));
consumer.submit(new Consumer(linkedBlockingQueue));
}
}
}
class Consumer implements Runnable{
LinkedBlockingQueue linkedBlockingQueue;

public Consumer(LinkedBlockingQueue linkedBlockingQueue) {
this.linkedBlockingQueue = linkedBlockingQueue;
}

public void run() {
try {
consumerStart(linkedBlockingQueue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void consumerStart(LinkedBlockingQueue<Integer> queue) throws InterruptedException {
while(true){
System.out.println("消费者-----"+queue.take());
Random random = new Random(47);
Thread.currentThread().sleep(random.nextInt(1000));
}
}
}


class Producer implements Runnable{

int total;
LinkedBlockingQueue linkedBlockingQueue;
int number;

public Producer(int total, LinkedBlockingQueue linkedBlockingQueue, int number) {
this.total = total;
this.linkedBlockingQueue = linkedBlockingQueue;
this.number = number;
}

public void run() {
try {
productStart(linkedBlockingQueue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void productStart(LinkedBlockingQueue<Integer> queue) throws InterruptedException {
while(true){
System.out.println(number + "生产者----"+queue.size());
queue.put(number);
}
}
}

非阻塞队列

如果考虑在 Java 中使用 LinkedList 来实现阻塞队列,那么第一点,需要实现入队和出队的原子性,因为 LinkedList 是基于双向链表来实现的,所以在这里必须保证其原子性的操作。

Java 中如果要实现对于链表的原子性操作,首先是加锁,考虑到加锁和释放锁导致的性能开销,决定使用可重入锁

有两种锁,一个是 synchronized,一个则是 ReentrantLock

锁的选型

synchronized 的加锁逻辑依赖于 JVM ,同时也是支持可重入。并且JDK1.6 以后对其做了大量的优化,所以一般情况下可以直接用synchronized

ReentrantLockJava 语言自带的一种可重入锁,相较于 synchronized ,它含有公平锁和非公平锁两种模式,并且支持 Condition

在这里由于LinkedBlockingQueue 采用的 ReentrantLock ,所以在这里也是采用 ReentrantLock

阻塞以及唤醒

当没有数据的时候,需要消费者阻塞,同时队列已经满了的情况下,需要生产者进行阻塞,而配合这些操作的就是将这些线程阻塞,在java 中可以通过 waitnotify 方法来进行阻塞和唤醒,

如果使用的是 ReentrantLock,也可以使用自己的 Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class NonBlockModel {

static ReentrantLock reentrantLock = new ReentrantLock();

static Condition produceCondition = reentrantLock.newCondition();
static Condition consumerCondition = reentrantLock.newCondition();

static ThreadPoolExecutor product = new ThreadPoolExecutor(5,10,1000, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100));

static ThreadPoolExecutor consumer = new ThreadPoolExecutor(5,10,1000, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100));

static int total =10;

public static void main(String[] args) throws InterruptedException {
LinkedList linkedList = new LinkedList();
for(int i =0;i< 5 ;i++){
product.submit(new NonBlockProducer(reentrantLock,linkedList,i,produceCondition,consumerCondition));
consumer.submit(new NonBlockConsumer(reentrantLock,linkedList,produceCondition,consumerCondition));
}
Thread.currentThread().sleep(1000000);
}

}
class NonBlockProducer implements Runnable{
ReentrantLock produceReentrantLock;
LinkedList<Integer> linkedList;
int number;
Condition produceCondition;
Condition consumerCondition;

public NonBlockProducer(ReentrantLock produceReentrantLock, LinkedList<Integer> linkedList, int number, Condition produceCondition, Condition consumerCondition) {
this.produceReentrantLock = produceReentrantLock;
this.linkedList = linkedList;
this.number = number;
this.produceCondition = produceCondition;
this.consumerCondition = consumerCondition;
}

@Override
public void run() {
while (true) {
produceReentrantLock.lock();
try {
if (10 == linkedList.size()) {
System.out.println("队列已满,生产者被阻塞" + number + "--" + Thread.currentThread().getName());
produceCondition.await();
}
if(linkedList.size() +1 < 10){
linkedList.push(number);
System.out.println("添加元素" + linkedList.size());
}
consumerCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
produceReentrantLock.unlock();
}
}
}
}
class NonBlockConsumer implements Runnable{
ReentrantLock consumerReentrantLock;
LinkedList<Integer> linkedList;
Condition produceCondition;
Condition consumerCondition;

public NonBlockConsumer(ReentrantLock consumerReentrantLock, LinkedList<Integer> linkedList, Condition produceCondition, Condition consumerCondition) {
this.consumerReentrantLock = consumerReentrantLock;
this.linkedList = linkedList;
this.produceCondition = produceCondition;
this.consumerCondition = consumerCondition;
}

@Override
public void run() {
while (true) {
consumerReentrantLock.lock();
try {
if (0 == linkedList.size()) {
System.out.println("队列已空,消费者已阻塞" + Thread.currentThread().getName());
consumerCondition.await();
}
linkedList.pollLast();
System.out.println("移除元素" + linkedList.size());
if(linkedList.size() > 1){
consumerCondition.signal();
}
produceCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumerReentrantLock.unlock();
}
}
}
}
作者

Somersames

发布于

2020-07-26

更新于

2021-12-05

许可协议

评论