java - 灵活的CountDownLatch?

我已经遇到了两次问题,一个生产者线程产生了n个工作项,将它们提交到一个ExecutorService中,然后需要等待直到所有n个项都被处理。
注意事项
n事先不知道。如果是的话,我只需要创建一个CountDownLatch然后让生产者线程await()直到所有工作完成。
使用CompletionService是不合适的,因为尽管我的生产者线程需要阻塞(即通过调用take())但没有任何方法可以指示所有工作都已完成,从而导致生产者线程停止等待。
我目前最喜欢的解决方案是使用一个整数计数器,在提交一个工作项时递增,在处理一个工作项时递减。在所有n个任务的子任务之后,我的生产者线程将需要等待一个锁,并在收到通知时检查counter == 0如果消费线程已递减计数器且新值为0,则需要通知生产者。
有没有更好的方法来解决这个问题,或者在我应该使用而不是“滚动自己的”中有没有合适的结构?
事先谢谢。

最佳答案

java.util.concurrent.Phaser看起来对你很有用。它计划在Java 7发布,但最稳定的版本可以在jsr166的兴趣组网站上找到。
相位器是一个光荣的循环屏障。您可以注册n个参与方,当您准备好等待他们在特定阶段的前进时。
关于它如何工作的一个简单示例:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}

本文翻译自 https://stackoverflow.com/questions/1636194/

网站遵循 CC BY-SA 4.0 协议,转载或引用请注明出处。

标签 java multithreading concurrency countdownlatch phaser


相关文章:

java - 为嵌入式Jetty指定JAR“resources / webapp”文件夹的ResourceBase的正确URL是什么?

c# - 如何在没有循环的情况下等待布尔值(使用任何类型的wait / semaphore / event / mutex等)

algorithm - 为Akka演员池排序了共享邮箱或总线? [关闭]

c# - 结合时间紧密的事件,提高效率和公平性

java - Java中是否有一种快速失败的同步方式?

java - 谷歌应用引擎部署错误

java - 叉子/加入杀死兄弟姐妹/分支

java - 为什么不能在Java构造函数中使用字段的简写数组初始化?

java - ScheduledExecutorService不会因未捕获的异常而停止线程

c++ - 为什么Linux进程比Windows轻巧? [关闭]