首先来看一个例子,例子来源于网上: 采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点: 采用链表数据结构Node的方式进行节点数据的记录, 同时其进行入队和出队的计数器采用原子性的AtomicInteger 其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock 其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。 1.相关变量 2.构造方法 3.方法 生产方法 put offer 消费者操作 take操作 remove操作: drainTo操作/** * 多线程模拟实现生产者/消费者模型 * */ public class BlockingQueueTest2 { /** * * 定义装苹果的篮子 * */ public class Basket { // 篮子,能够容纳3个苹果 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException { // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部) return basket.take(); } } // 定义苹果生产者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("!生产者生产苹果完毕:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); System.out.println(basket.consume()); System.out.println("!消费者消费苹果完毕:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一个装苹果的篮子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生产者001", basket); Producer producer2 = test.new Producer("生产者002", basket); Consumer consumer = test.new Consumer("消费者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行5s后,所有任务停止 // try { // Thread.sleep(1000 * 5); // } catch (InterruptedException e) { // e.printStackTrace(); // } // service.shutdownNow(); } }
//容量,为空时使用Integer.MAX_VALUE=2^31-1 private final int capacity; /** Current number of elements */ //计数,队列中的元素个数 private final AtomicInteger count = new AtomicInteger(); //头结点,head.item==null,首节点不存放元素 transient Node<E> head; //尾节点,last.next==null private transient Node<E> last; /** Lock held by take, poll, etc */ //消费队列锁 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //消费队列等待消费,用于队满时,进行消费 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //生产队列锁 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //生产队列等待生产,用于队空时,进行生产 private final Condition notFull = putLock.newCondition(); //节点信息:数据、后继点击 static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ //下一个节点,分为三种情况: // 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点 Node<E> next; Node(E x) { item = x; } }
//构造方法,空参构造默认队列容量为2^31-1 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //构造方法,带指定容量 public LinkedBlockingQueue(int capacity) { //对容量进行校验 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //创建节点信息 last = head = new Node<E>(null); } //构造方法,放入带指定集合的元素信息入队 //首先采用默认大小,进行上锁操作, // 放入元素到队列中,进行遍历,放入 public LinkedBlockingQueue(Collection<? extends E> c) { //默认队列大小,2^31-1 this(Integer.MAX_VALUE); //进行上锁操作 final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { //放入元素,进行计数 int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { //释放锁 putLock.unlock(); } }
//入队操作 //首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待, // 如果队列没有满,则进行生产操作,同时计数器进行计数 //生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程 //当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程 public void put(E e) throws InterruptedException { //非空校验 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. //设置计数为0,失败的时候返回 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //中断上锁 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //检查队列是否满了,满了进行阻塞操作 while (count.get() == capacity) { notFull.await(); } //入队操作,将节点信息插入到队尾 //last=last.next=node enqueue(node); c = count.getAndIncrement(); //元素没有满,则唤醒被阻塞的线程,增加线程 if (c + 1 < capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } //插入的是一个元素时唤醒阻塞等待的线程 if (c == 0) signalNotEmpty(); }
//阻塞带超时时间的offer操作 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //如果时间<0,则表示超时返回了,此时队列未满,直接返回 if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } //否者进行入队操作 enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } //首先进行非空校验,如果队满了,直接返回false //如果没有满,则进行上锁,同时进行判断, // 如果计数<容量,则进行入队操作 //最后释放锁 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
//take操作 消费消息 //如果队列为非空或者被唤醒,进行消费操作,计数器-1 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //开始消费 if (c == capacity) signalNotFull(); return x; }
//进行消费操作 poll,带超时时间 public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //进行poll操作 public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
//删除操作,释放指定节点信息 public boolean remove(Object o) { if (o == null) return false; //对生产消息和消费消息进行上锁 fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { //释放节点 unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
drainTo操作 public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } //一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销 //其中c和maxElement表示返回的集合、要获取的元素个数 public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; //进行上锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //拿到两者之间的最小的一个 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { //将元素添加中集合c中 while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算