原创

ArrayBlockingQueue - 源码详解

ArrayBlockingQueue

简介

​ 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁,默认为非公平策略。

​ “有界”,则是指ArrayBlockingQueue对应的数组是有界限且固定的,在创建对象时由构造函数指定,一旦指定则无法更改。

​ “阻塞队列”,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;

“FIFO(先进先出)”,原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。

使用场景

​ 生产者消费者模型中,生产数据和消费数据的速率不一致,如果生产数据速度快一些,消费(处理)不过来,就会导致数据丢失。这时候我们就可以应用上阻塞队列来解决这个问题。

结构关系

源码解析

​ 4.1、参数

  1. // 存储队列元素的数组
  2. final Object[] items;
  3. // 拿数据的索引,用于take,poll,peek,remove方法
  4. int takeIndex;
  5. // 放数据的索引,用于put,offer,add方法
  6. int putIndex;
  7. // 元素个数
  8. int count;
  9. // 可重入锁(全局)
  10. final ReentrantLock lock;
  11. // notEmpty条件对象,由lock创建 (非空条件队列:当队列空时,线程在该队列等待获取)
  12. private final Condition notEmpty;
  13. // notFull条件对象,由lock创建 (非满条件队列:当队列满时,线程在该队列等待插入)
  14. private final Condition notFull;

​ 4.2、构造器

  1. /**
  2. * 指定队列初始容量的构造器
  3. */
  4. public ArrayBlockingQueue(int capacity) {
  5. this(capacity, false);//默认构造非公平锁的阻塞队列
  6. }
  7. /**
  8. * 指定队列初始容量和公平/非公平策略的构造器
  9. */
  10. public ArrayBlockingQueue(int capacity, boolean fair) {
  11. if (capacity <= 0)
  12. throw new IllegalArgumentException();
  13. this.items = new Object[capacity];
  14. //初始化ReentrantLock重入锁,出队入队拥有这同一个锁
  15. lock = new ReentrantLock(fair);
  16. //初始化非空等待队列
  17. notEmpty = lock.newCondition();
  18. //初始化非满等待队列
  19. notFull = lock.newCondition();
  20. }
  21. /**
  22. * 根据已有集合构造队列
  23. */
  24. public ArrayBlockingQueue(int capacity, boolean fair,
  25. Collection<? extends E> c) {
  26. this(capacity, fair);
  27. final ReentrantLock lock = this.lock;
  28. lock.lock(); // Lock only for visibility, not mutual exclusion
  29. try {
  30. int i = 0;
  31. //将集合添加进数组构成的队列中
  32. try {
  33. for (E e : c) {
  34. checkNotNull(e);
  35. items[i++] = e;
  36. }
  37. } catch (ArrayIndexOutOfBoundsException ex) {
  38. throw new IllegalArgumentException();
  39. }
  40. count = i;
  41. putIndex = (i == capacity) ? 0 : i; // 如果队列已满,则重置putIndex为0
  42. } finally {
  43. lock.unlock();
  44. }
  45. }

​ 核心就是第二种构造器,从构造器也可以看出,ArrayBlockingQueue在构造时就指定了内部数组的大小,并通过ReentrantLock来保证并发环境下的线程安全,参数fair 公平/非公平策略

​ 第三种构造函数,根据已有集合构造队列,其中在28行,可以看到源码有一行注释,意思是:这个锁的操作并不是为了互斥操作,而是锁住其可见性。【线程T1是实例化ArrayBlockingQueue对象,T2是对实例化的ArrayBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的ArrayBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。】

​ 4.3、函数​

  1. // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出IllegalStateException。
  2. boolean add(E e)
  3. // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
  4. boolean offer(E e)
  5. // 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
  6. boolean offer(E e, long timeout, TimeUnit unit)
  7. // 获取但不移除此队列的头;如果此队列为空,则返回 null。
  8. E peek()
  9. // 获取并移除此队列的头,如果此队列为空,则返回 null。
  10. E poll()
  11. // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
  12. E poll(long timeout, TimeUnit unit)
  13. // 获取并移除此队列的头部,在元素变得可用之前一直等待,也就是说必须要拿到一个元素,除非线程中断。
  14. E take()
  15. // 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
  16. void put(E e)
  17. // 从此队列中移除指定元素的单个实例(如果存在)。
  18. boolean remove(Object o)
  19. // 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。
  20. int remainingCapacity()
  21. // 返回此队列中元素的数量。
  22. int size()
  23. // 自动移除此队列中的所有元素。
  24. void clear()
  25. // 如果此队列包含指定的元素,则返回 true。
  26. boolean contains(Object o)
  27. // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
  28. int drainTo(Collection<? super E> c)
  29. // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
  30. int drainTo(Collection<? super E> c, int maxElements)
  31. // 返回在此队列中的元素上按适当顺序进行迭代的迭代器。
  32. Iterator<E> iterator()
  33. // 返回一个按适当顺序包含此队列中所有元素的数组。
  34. Object[] toArray()
  35. // 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
  36. <T> T[] toArray(T[] a)
  37. // 返回此 collection 的字符串表示形式。
  38. String toString()

常用的操作对比

入队 方法名 队列满时处理方式 返回值 阻塞
1 add(E e) 抛出 IllegalStateException(“Queue full”) 异常 boolean
2 offer(E e) 返回false boolean
3 put(E e) 线程阻塞,直到中断或被唤醒 void
4 offer(E e, long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回false boolean
出队 方法名 队列满时处理方式 返回值 阻塞
1 peek() 返回null E
2 poll() 返回null E
3 take() 线程阻塞,指定中断或被唤醒 E
4 poll(long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回null E

add/peek 方法

​ add向队列种放入元素,其实是调用父类的add方法,父类实现中调用offer方法,在offer方法返回true时,add方法返回true,否则抛出“Queue full”的异常

  1. public boolean add(E e) {
  2. return super.add(e); // 可以看到这里调用的是父类 AbstractQueue 的add方法
  3. }
  4. //AbstractQueue 中 add
  5. public boolean add(E e) {
  6. if (offer(e)) // 父类实现中调用offer方法
  7. return true;
  8. else
  9. throw new IllegalStateException("Queue full");
  10. }

​ peek方法,获取但不移除此队列的头;如果此队列为空,则返回 null

  1. public E peek() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return itemAt(takeIndex); // null when queue is empty
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  10. // 直接返回takeIndex出的数组元素
  11. final E itemAt(int i) {
  12. return (E) items[i]; // items为该队列,i表示数组下标
  13. }
offer(E)/poll 方法

​ offer 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。

  1. public boolean offer(E e) {
  2. checkNotNull(e); // 检查传入参数是否为null
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. // cout为队列中元素数量,items为该队列
  7. if (count == items.length)
  8. return false; // 队列满了后返回false,该数据不处理的话会丢失
  9. else {
  10. enqueue(e); // 队列未满的话调用enqueue插入队列,然后返回true,插入成功
  11. return true;
  12. }
  13. } finally {
  14. lock.unlock();
  15. }
  16. }

​ poll 获取并移除此队列的头,如果此队列为空,则返回 null。

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. // 如果队列中元素数量为0,返回null,有数据时调用dequeue()方法出队
  6. return (count == 0) ? null : dequeue();
  7. } finally {
  8. lock.unlock();
  9. }
  10. }
offer(E,long,TimeUnit)/poll(long,TimeUnnit)

​ 比普通offer/poll方法区别在于,在队列满时指定了重试时间,如果超过指定的时间后还是无法添加或取出则返回false。

  1. public boolean offer(E e, long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. checkNotNull(e);
  4. // 获取超时时间
  5. long nanos = unit.toNanos(timeout);
  6. final ReentrantLock lock = this.lock;
  7. lock.lockInterruptibly();
  8. try {
  9. while (count == items.length) {
  10. if (nanos <= 0)
  11. return false; // 超过时间后返回false
  12. // 将当前调用线程挂起,添加到notFull条件队列中等待指定时间唤醒
  13. nanos = notFull.awaitNanos(nanos);
  14. }
  15. enqueue(e); // 入队
  16. return true; // 成功后返回true
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. // 取出元素时进行规定时间的重试,超过规定时间则返回null。
  22. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  23. long nanos = unit.toNanos(timeout);
  24. final ReentrantLock lock = this.lock;
  25. lock.lockInterruptibly();
  26. try {
  27. while (count == 0) {
  28. if (nanos <= 0)
  29. return null; // 超时后返回null
  30. // 将当前调用线程挂起,添加到notEmpty条件队列中等待指定时间唤醒
  31. nanos = notEmpty.awaitNanos(nanos);
  32. }
  33. return dequeue(); // 出队
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
put/tack 方法

​ put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。

最终调用的是enqueue(E x)方法。其内部通过putIndex索引直接将元素添加到数组items中,

可以看到,当添加的索引与数组的长度一致时,会把添加索引重新置为0

  1. // 阻塞的入参方法
  2. public void put(E e) throws InterruptedException {
  3. checkNotNull(e);
  4. final ReentrantLock lock = this.lock;
  5. lock.lockInterruptibly();
  6. try {
  7. // 当队列元素个数与数组长度相等时,无法添加元素.
  8. // 这里必须用while,防止虚假唤醒,这其实是多线程设计模式中的
  9. // Guarded Suspension(保护性暂挂模式),之所以这样做,
  10. // 是防止线程被意外唤醒,不经再次判断就直接调用
  11. while (count == items.length)
  12. // 将当前调用线程挂起,添加到notFull条件队列中等待唤醒
  13. notFull.await();
  14. enqueue(e);
  15. } finally {
  16. lock.unlock();
  17. }
  18. }
  19. //入队操作
  20. private void enqueue(E x) {
  21. final Object[] items = this.items;
  22. //通过putIndex索引对数组进行赋值
  23. items[putIndex] = x;
  24. //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
  25. if (++putIndex == items.length)
  26. putIndex = 0;
  27. // 队列item数组中数量加1
  28. count++;
  29. // 添加了元素说明队列有数据,唤醒notEmpty条件对象添加线程,执行取出操作
  30. notEmpty.signal();
  31. }

​ take方法,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程,执行take操作

  1. // 阻塞的出参方法
  2. public E take() throws InterruptedException {
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. // 当iteem数组中元素为0时,当前调用线程挂起,添加到notEmpty条件队列中等待唤醒
  7. while (count == 0)
  8. notEmpty.await();
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. //删除队列头元素并返回
  15. private E dequeue() {
  16. // 拿到当前数组的数据
  17. final Object[] items = this.items;
  18. @SuppressWarnings("unchecked")
  19. // 获取要删除的对象
  20. E x = (E) items[takeIndex];
  21. // 将数组中takeIndex索引位置设置为null
  22. items[takeIndex] = null;
  23. // takeIndex索引加1并判断是否与数组长度相等,如果相等说明已到尽头,恢复为0
  24. if (++takeIndex == items.length)
  25. takeIndex = 0;
  26. count--; // 队列个数减1
  27. if (itrs != null)
  28. itrs.elementDequeued(); // 同时更新迭代器中的元素数据
  29. // 删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操作
  30. notFull.signal();
  31. return x;
  32. }
remove(Object o)

​ 从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。从队列头遍历到队列尾,靠takeIndex和putIndex两个变量了

  1. // 删除队列中对象o元素,最多删除一条
  2. public boolean remove(Object o) {
  3. if (o == null) return false;
  4. final Object[] items = this.items;
  5. // 使用lock来保证,多线程修改成员属性的安全
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. // 当队列中有值的时候,才进行删除。
  10. if (count > 0) {
  11. // 队列尾下一个位置
  12. final int putIndex = this.putIndex;
  13. // 队列头的位置
  14. int i = takeIndex;
  15. do {
  16. // 当前位置元素与被删除元素相同
  17. if (o.equals(items[i])) {
  18. // 删除i位置元素
  19. removeAt(i);
  20. // 返回true
  21. return true;
  22. }
  23. if (++i == items.length)
  24. i = 0;
  25. // 当i==putIndex表示遍历完所有元素
  26. } while (i != putIndex);
  27. }
  28. return false;
  29. } finally {
  30. lock.unlock();
  31. }
  32. }
  1. // 删除队列removeIndex位置的元素
  2. void removeAt(final int removeIndex) {
  3. // assert lock.getHoldCount() == 1;
  4. // assert items[removeIndex] != null;
  5. // assert removeIndex >= 0 && removeIndex < items.length;
  6. final Object[] items = this.items;
  7. // 表示删除元素是列表头,就容易多了,与dequeue方法流程差不多
  8. if (removeIndex == takeIndex) {
  9. // 移除removeIndex位置元素
  10. items[takeIndex] = null;
  11. // 到了数组末尾,就要转到数组头位置
  12. if (++takeIndex == items.length)
  13. takeIndex = 0;
  14. // 队列数量减一
  15. count--;
  16. if (itrs != null)
  17. itrs.elementDequeued();
  18. } else {
  19. // an "interior" remove
  20. final int putIndex = this.putIndex;
  21. for (int i = removeIndex;;) {
  22. int next = i + 1;
  23. if (next == items.length)
  24. next = 0;
  25. // 还没有到队列尾,那么就将后一个位置元素覆盖前一个位置的元素
  26. if (next != putIndex) {
  27. items[i] = items[next];
  28. i = next;
  29. } else {
  30. // 将队列尾元素置位null
  31. items[i] = null;
  32. // 重新设置putIndex的值
  33. this.putIndex = i;
  34. break;
  35. }
  36. }
  37. // 队列数量减一
  38. count--;
  39. if (itrs != null)
  40. itrs.removedAt(removeIndex);
  41. }
  42. // 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notFull条件下等待的一个线程
  43. notFull.signal();
  44. }

​ 在队列中删除指定位置的元素,需要注意的是删除之后的数组还能保持队列形式

分为两种情况:

  1. 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置加一。
  2. 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeIndex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null。
size() 方法

​ 获取锁后直接返回 count ,并在返回前释放锁 。

  1. public int size() {
  2. // 获取独占锁
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. // 返回元素个数
  7. return count;
  8. } finally {
  9. // 解锁
  10. lock.unlock();
  11. }
  12. }

也许你会问,这里又没有修改 count 的 值,只是简单地获取,为何要加锁呢?

​ 其实如果 count 被声明为 volatile 的这里就不需要加锁了,因为 volatile 类型的变量保证了 内存的可见性,而ArrayBlockingQueue 中的 count 并没有被声明为 volatile 的,这是因为 count 操作都是在获取锁后进行的,而获取锁的语义之一是,获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性 。

源码分析完后,其实从上面的入队出队操作可以看出,ArrayBlockingQueue也可以看做是一个环形队列,首先它是有界的,然后它遵循的是先进先出原则,出队入队只需要操作相应的下标,就能构成一个循环

ArrayBlockingQueue 用的是同一个锁

ReentrantLock和其Condition的关系:

​ ReentrantLock内部维护了一个双向链表,链表上的每个节点都会保存一个线程,锁在双向链表的头部自选,取出线程执行。而Condition内部同样维持着一个双向链表,但是其向链表中添加元素(await)和从链表中移除(signal)元素没有像ReentrantLock那样,保证线程安全,所以在调用Condition的await()和signal()方法时,需要在lock.lock()和lock.unlock()之间以保证线程的安全。在调用Condition的signal时,它从自己的双向链表中取出一个节点放到了ReentrantLock的双向链表中,所以在具体的运行过程中不管ReentrantLock new 了几个Condition其实内部公用的一把锁。

注意事项

​ 1.ArrayBlockingQueue 底层基于数组实现,在对象创建时需要指定数组大小。在构建对象时,已经创建了数组。所以使用Array需要特别注意设定合适的队列大小,如果设置过大会造成内存浪费。如果设置内存太小,就会影响并发的性能。

​ 2.ArrayBlockingQueue用的是一把锁,LinkedBlockingQueue用的是两把锁,大多数场景适合使用LinkedBlockingQueue。在JDK源码当中有说明,LinkedBlockingQueue比ArrayBlockingQueue有更高的吞吐量

思考

​ 为什么ArrayBlockingQueue不使用LinkedBlockingQueue类似的双锁实现?

正文到此结束
该篇文章的评论功能已被站长关闭
本文目录