in computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java [75:127]
public WorkerStat input() {
WorkerStat workerStat = new WorkerStat();
this.recvManager.waitReceivedAllMessages();
Map<Integer, PeekableIterator<KvEntry>> vertices =
this.recvManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edges =
this.recvManager.edgePartitions();
// TODO: parallel input process
for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry :
vertices.entrySet()) {
int partition = entry.getKey();
PeekableIterator<KvEntry> vertexIter = entry.getValue();
PeekableIterator<KvEntry> edgesIter =
edges.getOrDefault(
partition,
PeekableIterator.emptyIterator());
FileGraphPartition part = new FileGraphPartition(this.context,
this.managers,
partition);
PartitionStat partitionStat = null;
ComputerException inputException = null;
try {
partitionStat = part.input(vertexIter, edgesIter);
} catch (ComputerException e) {
inputException = e;
} finally {
try {
vertexIter.close();
edgesIter.close();
} catch (Exception e) {
String message = "Failed to close vertex or edge file " +
"iterator";
ComputerException closeException = new ComputerException(
message, e);
if (inputException != null) {
inputException.addSuppressed(closeException);
} else {
throw closeException;
}
}
if (inputException != null) {
throw inputException;
}
}
workerStat.add(partitionStat);
this.partitions.put(partition, part);
}
return workerStat;
}