阻塞队列

1.什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。

·当阻塞队列是空时,从队列里获取元素的操作会被阻塞

当阻塞队列是满时,往队列里添加元素的操作会被阻塞

2.为什么需要BlockingQueue

在多线程领域,所谓阻塞,在某些情况会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程

在concurrent包发布以前,在多线程环境下,我们每个程序员都需要自己控制这些细节,尤其要兼顾效率和线程安全,而这通常会给我们的程序带来不小的复杂度。

3.阻塞队列的成员

  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

  • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序

  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。

  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)

  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

    ArrayBlockingQueue、LinkedBlockingQueue以及 SynchronousQueue这三个阻塞队列是重点

4.常见方法

image-20200219181701162

异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出

5.阻塞队列

ArrayBlockingQueue由数组构成的有界阻塞队列.

LinkedBlockingQueue由链表构成的有界阻塞队列(默认值为Integer.MAX_VALUE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class BlockingQueueDemo {

public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
/**
* 1、抛出异常 add()/remove()
*/
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d"));

// System.out.println(blockingQueue.element()); //检查队首元素

// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());


/**
* 2、返回布尔类型 offer()/pull()
*/
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("d"));
//
// System.out.println(blockingQueue.peek()); //检查队首元素
//
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());



/**
* 3、阻塞 put()/take()
*/
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// System.out.println("############");
// blockingQueue.put("d");
//
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());


/**
*4、超时
*/

System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS));
}
}

SynchronousQueue是一个不存储元素的阻塞队列,也即单个元素的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");

System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");

System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();


new Thread(() -> {
try {

TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}

6.消费者生产者模式

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{

private final BlockingQueue blockingQueue;
//设置队列缓存的大小。生产过程中超过这个大小就暂时停止生产
private final int QUEUE_SIZE = 10;


public Producer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}

int task = 1;
@Override
public void run() {

while(true){
try {
System.out.println("正在生产:" + task);
//将生产出来的产品放在队列缓存中
blockingQueue.put(task);
++task;
//让其停止一会,便于查看效果
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.BlockingQueue;

//消费者
public class Consumer implements Runnable{

private final BlockingQueue blockingQueue;

public Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}

@Override
public void run() {

//只要阻塞队列中有任务,就一直去消费
while(true){

try {
System.out.println("正在消费: " + blockingQueue.take());
//让其停止一会,便于查看效果
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* 生产者消费者模式
* 使用阻塞队列BlockingQueue
* @author wanggenshen
*
*/
public class TestConPro {



public static void main(String[] args){
BlockingQueue blockingQueue = new LinkedBlockingQueue(5);

Producer p = new Producer(blockingQueue);
Consumer c = new Consumer(blockingQueue);

Thread tp = new Thread(p);
Thread tc= new Thread(c);

tp.start();
tc.start();

}


}

如果改用有界阻塞队列ArrayBlockingQueue,就可以初始化队列的大小。则队列中元素超过队列大小的时候,生产者就会等待消费者消费一个再去生产一个:
测试代码:
初始化一个大小为10的ArrayBlockingQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args){
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);

Producer p = new Producer(blockingQueue);
Consumer c = new Consumer(blockingQueue);

Thread tp = new Thread(p);
Thread tc= new Thread(c);

tp.start();
tc.start();

}
-------------本文结束感谢您的阅读-------------
0%