in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java [76:135]
public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
final RuntimeEdge runtimeEdge,
final int dstTaskIndex) {
final String runtimeEdgeId = runtimeEdge.getId();
// Get the location of the src task (blocking call)
final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
.getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.RequestPipeLoc)
.setRequestPipeLocMsg(
ControlMessage.RequestPipeLocationMessage.newBuilder()
.setExecutorId(executorId)
.setRuntimeEdgeId(runtimeEdgeId)
.setSrcTaskIndex(srcTaskIndex)
.build())
.build());
return responseFromMasterFuture.thenCompose(responseFromMaster -> {
// Get executor id
if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
throw new RuntimeException("Response message type mismatch!");
}
final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
if (!pipeLocInfo.hasExecutorId()) {
throw new IllegalStateException();
}
final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();
if (targetExecutorId.equals(executorId)) {
// Read from the local executor
final Pair<String, Long> pairKey = Pair.of(runtimeEdge.getId(), Long.valueOf(srcTaskIndex));
pipeContainer.putPipeListIfAbsent(pairKey, getNumOfPipeToWait(runtimeEdge));
// initialize a local output context
final LocalOutputContext outputContext =
new LocalOutputContext(executorId, runtimeEdgeId, srcTaskIndex, dstTaskIndex);
pipeContainer.putPipe(pairKey, dstTaskIndex, outputContext);
// Initialize a local input context and connect it to the corresponding local output context
final LocalInputContext inputContext = new LocalInputContext(outputContext);
final CompletableFuture<DataUtil.IteratorWithNumBytes> result = new CompletableFuture<>();
result.complete(DataUtil.IteratorWithNumBytes.of(inputContext.getIterator()));
return result;
} else {
// Read from the remote executor
final ControlMessage.PipeTransferContextDescriptor descriptor =
ControlMessage.PipeTransferContextDescriptor.newBuilder()
.setRuntimeEdgeId(runtimeEdge.getId())
.setSrcTaskIndex(srcTaskIndex)
.setDstTaskIndex(dstTaskIndex)
.setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
.build();
return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
.thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
serializerManager.getSerializer(runtimeEdge.getId())));
}
});
}