public static JobTopology fromJsonPlan()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java [144:184]


    public static JobTopology fromJsonPlan(
            String jsonPlan,
            Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,
            Map<JobVertexID, Integer> maxParallelismMap,
            Map<JobVertexID, IOMetrics> metrics,
            Set<JobVertexID> finishedVertices)
            throws JsonProcessingException {
        ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
        ArrayNode nodes = (ArrayNode) plan.get("nodes");

        if (nodes == null || nodes.isEmpty()) {
            throw new NotReadyException("No nodes found in the plan, job is not ready yet");
        }

        var vertexInfo = new HashSet<VertexInfo>();

        for (JsonNode node : nodes) {
            var vertexId = JobVertexID.fromHexString(node.get("id").asText());
            var inputs = new HashMap<JobVertexID, ShipStrategy>();
            var ioMetrics = metrics.get(vertexId);
            var finished = finishedVertices.contains(vertexId);
            vertexInfo.add(
                    new VertexInfo(
                            vertexId,
                            slotSharingGroupIdMap.get(vertexId),
                            inputs,
                            node.get("parallelism").asInt(),
                            maxParallelismMap.get(vertexId),
                            finished,
                            finished ? IOMetrics.FINISHED_METRICS : ioMetrics));
            if (node.has("inputs")) {
                for (JsonNode input : node.get("inputs")) {
                    inputs.put(
                            JobVertexID.fromHexString(input.get("id").asText()),
                            ShipStrategy.of(input.get("ship_strategy").asText()));
                }
            }
        }

        return new JobTopology(vertexInfo);
    }