void addEncodingCompressionCheckers()

in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [436:487]


  void addEncodingCompressionCheckers() {
    final NeighborChecker additionalOutputEncoder = ((irVertex, inEdges, outEdges) -> {
      for (final List<IREdge> sameTagOutEdges : groupOutEdgesByAdditionalOutputTag(outEdges, false)) {
        final List<IREdge> nonStreamVertexEdge = sameTagOutEdges.stream()
          .filter(stoe -> !isConnectedToStreamVertex(stoe))
          .collect(Collectors.toList());

        if (!nonStreamVertexEdge.isEmpty()) {
          Set<? extends Class<? extends EncoderFactory>> encoderProperties = nonStreamVertexEdge.stream().map(e
            -> e.getPropertyValue(EncoderProperty.class).get().getClass()).collect(Collectors.toSet());
          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass())
            .distinct().count()) {
            // if the number of distinct encoders is two and one of them is DummyEncoderFactory, this indicates that
            // one of the edge comes from SignalVertex, which is used in DynamicTaskSizingPolicy.
            // Therefore, we do return failure in this case.
            if (!encoderProperties.contains(EncoderFactory.DummyEncoderFactory.class)
              || encoderProperties.size() != 2) {
              return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
            }
          }
          Set<? extends Class<? extends DecoderFactory>> decoderProperties = nonStreamVertexEdge.stream().map(e
            -> e.getPropertyValue(DecoderProperty.class).get().getClass()).collect(Collectors.toSet());
          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass())
            .distinct().count()) {
            // if the number of distinct decoders is two and one of them is DummyDecoderFactory, this indicates that
            // one of the edge comes from SignalVertex, which is used in DynamicTaskSizingPolicy.
            // Therefore, we do not return failure in this case.
            if (!decoderProperties.contains(DecoderFactory.DummyDecoderFactory.class)
              || encoderProperties.size() != 2) {
              return failure("Incompatible decoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
            }
          }
        }
      }
      return success();
    });
    neighborCheckerList.add(additionalOutputEncoder);

    // TODO #342: Check Encoder/Decoder symmetry

    final SingleEdgeChecker compressAndDecompress = (edge -> {
      if (!isConnectedToStreamVertex(edge)) {
        if (!edge.getPropertyValue(CompressionProperty.class)
          .equals(edge.getPropertyValue(DecompressionProperty.class))) {
          return failure("Compression and decompression must be symmetric",
            edge, CompressionProperty.class, DecompressionProperty.class);
        }
      }
      return success();
    });
    singleEdgeCheckerList.add(compressAndDecompress);
  }