in computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java [107:167]
private void testEdgeFreq(EdgeFrequency freq)
throws IOException {
this.config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "1",
ComputerOptions.WORKER_COMBINER_CLASS,
Null.class.getName(), // Can't combine
ComputerOptions.ALGORITHM_RESULT_CLASS,
IdListList.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
IdList.class.getName(),
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000",
ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000",
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10",
ComputerOptions.INPUT_EDGE_FREQ, freq.name(),
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
this.managers = new Managers();
FileManager fileManager = new FileManager();
this.managers.add(fileManager);
SortManager sortManager = new SendSortManager(context());
this.managers.add(sortManager);
MessageSendManager sendManager = new MessageSendManager(
context(), sortManager,
new MockMessageSender());
this.managers.add(sendManager);
MessageRecvManager receiveManager = new MessageRecvManager(context(),
fileManager,
sortManager);
this.managers.add(receiveManager);
this.managers.initAll(this.config);
ConnectionId connectionId = new ConnectionId(new InetSocketAddress(
"localhost", 8081),
0);
FileGraphPartition partition = new FileGraphPartition(
context(), this.managers, 0);
receiveManager.onStarted(connectionId);
add200VertexBuffer((NetworkBuffer buffer) -> {
receiveManager.handle(MessageType.VERTEX, 0, buffer);
});
receiveManager.onFinished(connectionId);
receiveManager.onStarted(connectionId);
addEdgeBuffer((NetworkBuffer buffer) -> {
receiveManager.handle(MessageType.EDGE, 0, buffer);
}, freq);
receiveManager.onFinished(connectionId);
Whitebox.invoke(partition.getClass(), new Class<?>[] {
PeekableIterator.class, PeekableIterator.class},
"input", partition,
receiveManager.vertexPartitions().get(0),
receiveManager.edgePartitions().get(0));
File edgeFile = Whitebox.getInternalState(partition, "edgeFile");
EdgesInput edgesInput = new EdgesInput(context(), edgeFile);
edgesInput.init();
this.checkEdgesInput(edgesInput, freq);
edgesInput.close();
}