in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/profiling/ExecutionPlanMeasurement.java [73:138]
public static ExecutionPlanMeasurement capture(ExecutionPlan executionPlan, String id) {
ExecutionPlanMeasurement instance = new ExecutionPlanMeasurement(id);
// Collect the tasks of the plan.
Set<ExecutionTask> executionTasks = executionPlan.collectAllTasks();
// Initialize the new instance.
instance.operators = new ArrayList<>(executionTasks.size());
instance.channels = new ArrayList<>(executionTasks.size());
instance.links = new ArrayList<>(executionTasks.size());
// Keep track of already created ChannelNodes.
Map<Channel, ChannelNode> channelNodeMap = new HashMap<>(executionTasks.size());
// Go over the ExecutionTasks and create all ChannelNodes, OperatorNodes, and Links immediately.
int nextNodeId = 0;
for (ExecutionTask executionTask : executionTasks) {
// Create the OperatorNode.
ExecutionOperator operator = executionTask.getOperator();
OperatorNode operatorNode = new OperatorNode(
nextNodeId++,
operator.getClass().getCanonicalName(),
operator.getName(),
operator.getPlatform().getName()
);
instance.operators.add(operatorNode);
// Create inbound ChannelNodes and Links.
for (Channel inputChannel : executionTask.getInputChannels()) {
if (inputChannel == null) continue;
ChannelNode channelNode = channelNodeMap.get(inputChannel);
if (channelNode == null) {
channelNode = new ChannelNode(
nextNodeId++,
inputChannel.getClass().getCanonicalName(),
inputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
);
channelNodeMap.put(inputChannel, channelNode);
instance.channels.add(channelNode);
}
instance.links.add(new Link(channelNode.getId(), operatorNode.getId()));
}
// Create outbound ChannelNodes and Links.
for (Channel outputChannel : executionTask.getOutputChannels()) {
if (outputChannel == null) continue;
ChannelNode channelNode = channelNodeMap.get(outputChannel);
if (channelNode == null) {
channelNode = new ChannelNode(
nextNodeId++,
outputChannel.getClass().getCanonicalName(),
outputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
);
channelNodeMap.put(outputChannel, channelNode);
instance.channels.add(channelNode);
}
instance.links.add(new Link(operatorNode.getId(), channelNode.getId()));
}
}
return instance;
}