private void testEdgeFreq()

in computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java [107:167]


    private void testEdgeFreq(EdgeFrequency freq)
                              throws IOException {
        this.config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.JOB_ID, "local_001",
                ComputerOptions.JOB_WORKERS_COUNT, "1",
                ComputerOptions.JOB_PARTITIONS_COUNT, "1",
                ComputerOptions.WORKER_COMBINER_CLASS,
                Null.class.getName(), // Can't combine
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                IdListList.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                IdList.class.getName(),
                ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
                ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000",
                ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000",
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10",
                ComputerOptions.INPUT_EDGE_FREQ, freq.name(),
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        this.managers = new Managers();
        FileManager fileManager = new FileManager();
        this.managers.add(fileManager);
        SortManager sortManager = new SendSortManager(context());
        this.managers.add(sortManager);

        MessageSendManager sendManager = new MessageSendManager(
                                         context(), sortManager,
                                         new MockMessageSender());
        this.managers.add(sendManager);
        MessageRecvManager receiveManager = new MessageRecvManager(context(),
                                                                   fileManager,
                                                                   sortManager);
        this.managers.add(receiveManager);
        this.managers.initAll(this.config);
        ConnectionId connectionId = new ConnectionId(new InetSocketAddress(
                                                     "localhost", 8081),
                                                     0);
        FileGraphPartition partition = new FileGraphPartition(
                                           context(), this.managers, 0);
        receiveManager.onStarted(connectionId);
        add200VertexBuffer((NetworkBuffer buffer) -> {
            receiveManager.handle(MessageType.VERTEX, 0, buffer);
        });
        receiveManager.onFinished(connectionId);
        receiveManager.onStarted(connectionId);
        addEdgeBuffer((NetworkBuffer buffer) -> {
            receiveManager.handle(MessageType.EDGE, 0, buffer);
        }, freq);

        receiveManager.onFinished(connectionId);
        Whitebox.invoke(partition.getClass(), new Class<?>[] {
                                PeekableIterator.class, PeekableIterator.class},
                        "input", partition,
                        receiveManager.vertexPartitions().get(0),
                        receiveManager.edgePartitions().get(0));
        File edgeFile = Whitebox.getInternalState(partition, "edgeFile");
        EdgesInput edgesInput = new EdgesInput(context(), edgeFile);
        edgesInput.init();
        this.checkEdgesInput(edgesInput, freq);
        edgesInput.close();
    }