java - 如何创建移相器的链/层

我正在编写多线程应用程序,它使用phaser来知道何时完成工作。问题在于,在ExtExtRealService服务中,队列中甚至有100K的线程,但是移相器中的到达方的最大数量是65535。到了65536派对,我该怎么办?
我的示例代码:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

问题出在addParties()方法中。单线程(simpleparser)可以返回100个新文件名,将有100个新线程提交给executorcompletionservice,并在phaser中注册了100个新的参与方。
我试过用这样的东西:
if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

创建一个相位链,但是没有帮助,因为p.getUnarrivedParties()返回0,但是我不能注册它的下一个方…
    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

印刷品:
65535个
0个
抛出非法状态异常
那么,我如何才能创建新的相位器,将连接到这个旧的?
//编辑
谢谢你@bowmore。
我还有两个问题。
让我们看一个例子:
import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

它打印:
Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

为什么在child1.arriveAndDeregister()之后;child1没有终止,如何检查它是否真的终止?
第二个问题。
我问到在达到65535个派对之后创建新的相位器,因为我认为创建数千个新对象是没有用的-你认为这样做不会有内存问题,或者它甚至可以提高性能?

最佳答案

新的进程可以在原始的新创建的子项目上注册,而不是注册现有的cc。创建子Phaser只需将父Phaser提供给子构造函数即可。

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

如果您只想在达到某个阈值时创建子相位器,则可以检查已注册方的数量,r而不是未注册方的数量:
public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

编辑:
后续问题1:childPhasers与rootPhaser共享其终止状态,下面是Phaser
public boolean isTerminated() {
    return root.state < 0L;
}

后续问题2:父相位器实际上不保留对其子相位器的引用。一旦子相位器不再被引用,它就有资格进行垃圾收集。您最好遵循javadoc中的建议:
每相器任务的最佳值主要取决于预期的同步速率。一个低至4的值可能适合于每个阶段非常小的任务体(因此高比率),或者对于非常大的任务体高达数百。
分层的主要原因是减少了严重的同步争用,因此,如果任务较轻,则每个相位器的任务越少越好。配置不同的设置来调整这些设置不会有什么坏处。

本文翻译自 https://stackoverflow.com/questions/14093200/

网站遵循 CC BY-SA 4.0 协议,转载或引用请注明出处。

标签 java multithreading executorservice phaser


相关文章:

java - 如何创建具有父/子关系的@Entity

java - 如何为Swagger文档将@ApiModelProperty dataType设置为String

ios - 多线程核心数据导致UI挂起

multithreading - 如何在分区数组上运行并行计算线程?

java - 使用ExecutorService有什么好处?

java - 是否可以有一组线程池共享大线程池中的线程,而不是创建新线程?

java - 如何避免使用JPA进行循环引用注释?

java - 如何调用在对象实例化时定义的方法?

java - 多线程中的线程池

java - 等待固定时间段内终止多个ExecutorServices