public static ExecutionPlanMeasurement capture()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/profiling/ExecutionPlanMeasurement.java [73:138]


    public static ExecutionPlanMeasurement capture(ExecutionPlan executionPlan, String id) {
        ExecutionPlanMeasurement instance = new ExecutionPlanMeasurement(id);

        // Collect the tasks of the plan.
        Set<ExecutionTask> executionTasks = executionPlan.collectAllTasks();

        // Initialize the new instance.
        instance.operators = new ArrayList<>(executionTasks.size());
        instance.channels = new ArrayList<>(executionTasks.size());
        instance.links = new ArrayList<>(executionTasks.size());

        // Keep track of already created ChannelNodes.
        Map<Channel, ChannelNode> channelNodeMap = new HashMap<>(executionTasks.size());

        // Go over the ExecutionTasks and create all ChannelNodes, OperatorNodes, and Links immediately.
        int nextNodeId = 0;
        for (ExecutionTask executionTask : executionTasks) {
            // Create the OperatorNode.
            ExecutionOperator operator = executionTask.getOperator();
            OperatorNode operatorNode = new OperatorNode(
                    nextNodeId++,
                    operator.getClass().getCanonicalName(),
                    operator.getName(),
                    operator.getPlatform().getName()
            );
            instance.operators.add(operatorNode);

            // Create inbound ChannelNodes and Links.
            for (Channel inputChannel : executionTask.getInputChannels()) {
                if (inputChannel == null) continue;

                ChannelNode channelNode = channelNodeMap.get(inputChannel);
                if (channelNode == null) {
                    channelNode = new ChannelNode(
                            nextNodeId++,
                            inputChannel.getClass().getCanonicalName(),
                            inputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
                    );
                    channelNodeMap.put(inputChannel, channelNode);
                    instance.channels.add(channelNode);
                }

                instance.links.add(new Link(channelNode.getId(), operatorNode.getId()));
            }

            // Create outbound ChannelNodes and Links.
            for (Channel outputChannel : executionTask.getOutputChannels()) {
                if (outputChannel == null) continue;

                ChannelNode channelNode = channelNodeMap.get(outputChannel);
                if (channelNode == null) {
                    channelNode = new ChannelNode(
                            nextNodeId++,
                            outputChannel.getClass().getCanonicalName(),
                            outputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
                    );
                    channelNodeMap.put(outputChannel, channelNode);
                    instance.channels.add(channelNode);
                }

                instance.links.add(new Link(operatorNode.getId(), channelNode.getId()));
            }
        }

        return instance;
    }