生产者和消费者模型,主要解决的是数据的同步问题,生产者将数据放置一个存储区域,然后消费者过来取数据。这种模式类似于一个中间件,可以使得生产者不需要关心消费者什么时候来拿数据,同时在这种模式下,还可以控制两边的处理速率,避免数据的丢失。
下面以 Java 为例,来写一个生产者和消费者模型。
当队列满了的时候,生产者自己进行阻塞。而当消费者发现队列为空,则将自己阻塞。
所以要实现这个生产者消费者模型,首先必须有以下条件:
生产者或者消费者必须支持可阻塞
在多线程的情况下,必须保证并发安全(即插入不能产生数据错误),取数据不可以重复取
阻塞队列 在Java
中,常用的阻塞队列有 LinkedBlockingQueue
或者 ArrayBlockingQueue
,这两个阻塞队列的实现都是基于 ReentrantLock
,通过可重入锁来控制并发情况下的插入操作。
所以,如下便是在 Java 中的生产者和消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public class BlockModel { static ThreadPoolExecutor product = new ThreadPoolExecutor(5 ,10 ,1000 , TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100 )); static ThreadPoolExecutor consumer = new ThreadPoolExecutor(5 ,10 ,1000 , TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100 )); public static void main (String[] args) throws InterruptedException { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10 ); for (int i =0 ;i< 5 ;i++){ product.submit(new Producer(100 ,linkedBlockingQueue,i)); consumer.submit(new Consumer(linkedBlockingQueue)); } } } class Consumer implements Runnable { LinkedBlockingQueue linkedBlockingQueue; public Consumer (LinkedBlockingQueue linkedBlockingQueue) { this .linkedBlockingQueue = linkedBlockingQueue; } public void run () { try { consumerStart(linkedBlockingQueue); } catch (InterruptedException e) { e.printStackTrace(); } } private void consumerStart (LinkedBlockingQueue<Integer> queue) throws InterruptedException { while (true ){ System.out.println("消费者-----" +queue.take()); Random random = new Random(47 ); Thread.currentThread().sleep(random.nextInt(1000 )); } } } class Producer implements Runnable { int total; LinkedBlockingQueue linkedBlockingQueue; int number; public Producer (int total, LinkedBlockingQueue linkedBlockingQueue, int number) { this .total = total; this .linkedBlockingQueue = linkedBlockingQueue; this .number = number; } public void run () { try { productStart(linkedBlockingQueue); } catch (InterruptedException e) { e.printStackTrace(); } } private void productStart (LinkedBlockingQueue<Integer> queue) throws InterruptedException { while (true ){ System.out.println(number + "生产者----" +queue.size()); queue.put(number); } } }
非阻塞队列 如果考虑在 Java 中使用 LinkedList
来实现阻塞队列,那么第一点,需要实现入队和出队的原子性,因为 LinkedList
是基于双向链表来实现的,所以在这里必须保证其原子性的操作。
在Java
中如果要实现对于链表的原子性操作,首先是加锁,考虑到加锁和释放锁导致的性能开销,决定使用可重入锁
有两种锁,一个是 synchronized
,一个则是 ReentrantLock
锁的选型 synchronized
的加锁逻辑依赖于 JVM
,同时也是支持可重入。并且JDK1.6
以后对其做了大量的优化,所以一般情况下可以直接用synchronized
。
ReentrantLock
是 Java
语言自带的一种可重入锁,相较于 synchronized
,它含有公平锁和非公平锁两种模式,并且支持 Condition
。
在这里由于LinkedBlockingQueue
采用的 ReentrantLock
,所以在这里也是采用 ReentrantLock
阻塞以及唤醒 当没有数据的时候,需要消费者阻塞,同时队列已经满了的情况下,需要生产者进行阻塞,而配合这些操作的就是将这些线程阻塞,在java
中可以通过 wait
和 notify
方法来进行阻塞和唤醒,
如果使用的是 ReentrantLock
,也可以使用自己的 Condition
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class NonBlockModel { static ReentrantLock reentrantLock = new ReentrantLock(); static Condition produceCondition = reentrantLock.newCondition(); static Condition consumerCondition = reentrantLock.newCondition(); static ThreadPoolExecutor product = new ThreadPoolExecutor(5 ,10 ,1000 , TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100 )); static ThreadPoolExecutor consumer = new ThreadPoolExecutor(5 ,10 ,1000 , TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>(100 )); static int total =10 ; public static void main (String[] args) throws InterruptedException { LinkedList linkedList = new LinkedList(); for (int i =0 ;i< 5 ;i++){ product.submit(new NonBlockProducer(reentrantLock,linkedList,i,produceCondition,consumerCondition)); consumer.submit(new NonBlockConsumer(reentrantLock,linkedList,produceCondition,consumerCondition)); } Thread.currentThread().sleep(1000000 ); } } class NonBlockProducer implements Runnable { ReentrantLock produceReentrantLock; LinkedList<Integer> linkedList; int number; Condition produceCondition; Condition consumerCondition; public NonBlockProducer (ReentrantLock produceReentrantLock, LinkedList<Integer> linkedList, int number, Condition produceCondition, Condition consumerCondition) { this .produceReentrantLock = produceReentrantLock; this .linkedList = linkedList; this .number = number; this .produceCondition = produceCondition; this .consumerCondition = consumerCondition; } @Override public void run () { while (true ) { produceReentrantLock.lock(); try { if (10 == linkedList.size()) { System.out.println("队列已满,生产者被阻塞" + number + "--" + Thread.currentThread().getName()); produceCondition.await(); } if (linkedList.size() +1 < 10 ){ linkedList.push(number); System.out.println("添加元素" + linkedList.size()); } consumerCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { produceReentrantLock.unlock(); } } } } class NonBlockConsumer implements Runnable { ReentrantLock consumerReentrantLock; LinkedList<Integer> linkedList; Condition produceCondition; Condition consumerCondition; public NonBlockConsumer (ReentrantLock consumerReentrantLock, LinkedList<Integer> linkedList, Condition produceCondition, Condition consumerCondition) { this .consumerReentrantLock = consumerReentrantLock; this .linkedList = linkedList; this .produceCondition = produceCondition; this .consumerCondition = consumerCondition; } @Override public void run () { while (true ) { consumerReentrantLock.lock(); try { if (0 == linkedList.size()) { System.out.println("队列已空,消费者已阻塞" + Thread.currentThread().getName()); consumerCondition.await(); } linkedList.pollLast(); System.out.println("移除元素" + linkedList.size()); if (linkedList.size() > 1 ){ consumerCondition.signal(); } produceCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { consumerReentrantLock.unlock(); } } } }