in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [436:487]
void addEncodingCompressionCheckers() {
final NeighborChecker additionalOutputEncoder = ((irVertex, inEdges, outEdges) -> {
for (final List<IREdge> sameTagOutEdges : groupOutEdgesByAdditionalOutputTag(outEdges, false)) {
final List<IREdge> nonStreamVertexEdge = sameTagOutEdges.stream()
.filter(stoe -> !isConnectedToStreamVertex(stoe))
.collect(Collectors.toList());
if (!nonStreamVertexEdge.isEmpty()) {
Set<? extends Class<? extends EncoderFactory>> encoderProperties = nonStreamVertexEdge.stream().map(e
-> e.getPropertyValue(EncoderProperty.class).get().getClass()).collect(Collectors.toSet());
if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass())
.distinct().count()) {
// if the number of distinct encoders is two and one of them is DummyEncoderFactory, this indicates that
// one of the edge comes from SignalVertex, which is used in DynamicTaskSizingPolicy.
// Therefore, we do return failure in this case.
if (!encoderProperties.contains(EncoderFactory.DummyEncoderFactory.class)
|| encoderProperties.size() != 2) {
return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
}
}
Set<? extends Class<? extends DecoderFactory>> decoderProperties = nonStreamVertexEdge.stream().map(e
-> e.getPropertyValue(DecoderProperty.class).get().getClass()).collect(Collectors.toSet());
if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass())
.distinct().count()) {
// if the number of distinct decoders is two and one of them is DummyDecoderFactory, this indicates that
// one of the edge comes from SignalVertex, which is used in DynamicTaskSizingPolicy.
// Therefore, we do not return failure in this case.
if (!decoderProperties.contains(DecoderFactory.DummyDecoderFactory.class)
|| encoderProperties.size() != 2) {
return failure("Incompatible decoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
}
}
}
}
return success();
});
neighborCheckerList.add(additionalOutputEncoder);
// TODO #342: Check Encoder/Decoder symmetry
final SingleEdgeChecker compressAndDecompress = (edge -> {
if (!isConnectedToStreamVertex(edge)) {
if (!edge.getPropertyValue(CompressionProperty.class)
.equals(edge.getPropertyValue(DecompressionProperty.class))) {
return failure("Compression and decompression must be symmetric",
edge, CompressionProperty.class, DecompressionProperty.class);
}
}
return success();
});
singleEdgeCheckerList.add(compressAndDecompress);
}