in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java [144:184]
public static JobTopology fromJsonPlan(
String jsonPlan,
Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,
Map<JobVertexID, Integer> maxParallelismMap,
Map<JobVertexID, IOMetrics> metrics,
Set<JobVertexID> finishedVertices)
throws JsonProcessingException {
ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
ArrayNode nodes = (ArrayNode) plan.get("nodes");
if (nodes == null || nodes.isEmpty()) {
throw new NotReadyException("No nodes found in the plan, job is not ready yet");
}
var vertexInfo = new HashSet<VertexInfo>();
for (JsonNode node : nodes) {
var vertexId = JobVertexID.fromHexString(node.get("id").asText());
var inputs = new HashMap<JobVertexID, ShipStrategy>();
var ioMetrics = metrics.get(vertexId);
var finished = finishedVertices.contains(vertexId);
vertexInfo.add(
new VertexInfo(
vertexId,
slotSharingGroupIdMap.get(vertexId),
inputs,
node.get("parallelism").asInt(),
maxParallelismMap.get(vertexId),
finished,
finished ? IOMetrics.FINISHED_METRICS : ioMetrics));
if (node.has("inputs")) {
for (JsonNode input : node.get("inputs")) {
inputs.put(
JobVertexID.fromHexString(input.get("id").asText()),
ShipStrategy.of(input.get("ship_strategy").asText()));
}
}
}
return new JobTopology(vertexInfo);
}