void addParallelismCheckers()

in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [171:247]


  void addParallelismCheckers() {
    final SingleVertexChecker parallelismWithOtherEPsInSingleVertex = (v -> {
      final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
      if (!parallelism.isPresent()) {
        return success(); // No need to check, if the parallelism is not set yet
      }

      final Optional<Integer> resourceSiteSize = v.getPropertyValue(ResourceSiteProperty.class)
        .map(rs -> rs.values().stream().mapToInt(Integer::intValue).sum());
      if (resourceSiteSize.isPresent() && !parallelism.equals(resourceSiteSize)) {
        return failure("Parallelism must equal to sum of site nums",
          v, ParallelismProperty.class, ResourceSiteProperty.class);
      }

      final Optional<HashSet<Integer>> antiAffinitySet = v.getPropertyValue(ResourceAntiAffinityProperty.class);
      if (antiAffinitySet.isPresent()
        && !getZeroToNSet(parallelism.get()).containsAll(antiAffinitySet.get())) {
        return failure("Offsets must be within parallelism",
          v, ParallelismProperty.class, ResourceAntiAffinityProperty.class);
      }

      return success();
    });
    singleVertexCheckerList.add(parallelismWithOtherEPsInSingleVertex);

    final SingleVertexChecker parallelismOfSourceVertex = (v -> {
      final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
      try {
        if (parallelism.isPresent() && v instanceof SourceVertex) {
          final int numOfReadables = ((SourceVertex) v).getReadables(parallelism.get()).size();
          if (parallelism.get() != numOfReadables) {
            return failure(String.format("(Parallelism %d) != (Number of SourceVertex %s Readables %d)",
              parallelism.get(), v.getId(), numOfReadables));
          }
        }
      } catch (Exception e) {
        return failure(e.getMessage());
      }

      return success();
    });
    singleVertexCheckerList.add(parallelismOfSourceVertex);

    final NeighborChecker parallelismWithCommPattern = ((v, inEdges, outEdges) -> {
      // Just look at incoming (edges, as this checker will be applied on every vertex
      for (final IREdge inEdge : inEdges) {
        if (CommunicationPatternProperty.Value.ONE_TO_ONE
          .equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
          if (v.getPropertyValue(ParallelismProperty.class).isPresent()
            && inEdge.getSrc().getPropertyValue(ParallelismProperty.class).isPresent()
            && !inEdge.getSrc().getPropertyValue(ParallelismProperty.class)
            .equals(v.getPropertyValue(ParallelismProperty.class))) {
            return failure("OneToOne edges must have the same parallelism",
              inEdge.getSrc(), ParallelismProperty.class, v, ParallelismProperty.class);
          }
        }
      }

      return success();
    });
    neighborCheckerList.add(parallelismWithCommPattern);

    final NeighborChecker parallelismWithPartitionSet = ((v, inEdges, outEdges) -> {
      final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
      for (final IREdge inEdge : inEdges) {
        final Optional<Integer> keyRangeListSize = inEdge.getPropertyValue(PartitionSetProperty.class)
          .map(List::size);
        if (parallelism.isPresent() && keyRangeListSize.isPresent() && !parallelism.equals(keyRangeListSize)) {
          return failure("PartitionSet must contain all task offsets required for the dst parallelism",
            v, ParallelismProperty.class, inEdge, PartitionSetProperty.class);
        }
      }

      return success();
    });
    neighborCheckerList.add(parallelismWithPartitionSet);
  }