in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java [255:436]
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final long sizeBeforeCompression = flowFile.getSize();
final String compressionMode = context.getProperty(MODE).getValue();
String compressionFormatValue = context.getProperty(COMPRESSION_FORMAT).getValue();
if (compressionFormatValue.equals(COMPRESSION_FORMAT_ATTRIBUTE)) {
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
if (mimeType == null) {
logger.error("No {} attribute exists for {}; routing to failure", CoreAttributes.MIME_TYPE.key(), flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType);
if (compressionFormatValue == null) {
logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing", flowFile, mimeType);
session.transfer(flowFile, REL_SUCCESS);
return;
}
}
final String compressionFormat = compressionFormatValue;
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final StopWatch stopWatch = new StopWatch(true);
final String fileExtension = switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_GZIP -> ".gz";
case COMPRESSION_FORMAT_DEFLATE -> ".zlib";
case COMPRESSION_FORMAT_LZMA -> ".lzma";
case COMPRESSION_FORMAT_XZ_LZMA2 -> ".xz";
case COMPRESSION_FORMAT_BZIP2 -> ".bz2";
case COMPRESSION_FORMAT_SNAPPY -> ".snappy";
case COMPRESSION_FORMAT_SNAPPY_HADOOP -> ".snappy";
case COMPRESSION_FORMAT_SNAPPY_FRAMED -> ".sz";
case COMPRESSION_FORMAT_LZ4_FRAMED -> ".lz4";
case COMPRESSION_FORMAT_ZSTD -> ".zst";
case COMPRESSION_FORMAT_BROTLI -> ".br";
default -> "";
};
try {
flowFile = session.write(flowFile, (rawIn, rawOut) -> {
final OutputStream compressionOut;
final InputStream compressionIn;
final OutputStream bufferedOut = new BufferedOutputStream(rawOut, 65536);
final InputStream bufferedIn = new BufferedInputStream(rawIn, 65536);
try {
if (MODE_COMPRESS.equalsIgnoreCase(compressionMode)) {
compressionIn = bufferedIn;
switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_GZIP: {
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
mimeTypeRef.set("application/gzip");
break;
}
case COMPRESSION_FORMAT_DEFLATE: {
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
compressionOut = new DeflaterOutputStream(bufferedOut, new Deflater(compressionLevel));
mimeTypeRef.set("application/gzip");
break;
}
case COMPRESSION_FORMAT_LZMA:
compressionOut = new LzmaOutputStream.Builder(bufferedOut).build();
mimeTypeRef.set("application/x-lzma");
break;
case COMPRESSION_FORMAT_XZ_LZMA2:
final int xzCompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options(xzCompressionLevel));
mimeTypeRef.set("application/x-xz");
break;
case COMPRESSION_FORMAT_SNAPPY:
compressionOut = new SnappyOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy");
break;
case COMPRESSION_FORMAT_SNAPPY_HADOOP:
compressionOut = new SnappyHadoopCompatibleOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy-hadoop");
break;
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionOut = new SnappyFramedOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy-framed");
break;
case COMPRESSION_FORMAT_LZ4_FRAMED:
mimeTypeRef.set("application/x-lz4-framed");
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
case COMPRESSION_FORMAT_ZSTD:
final int zstdCompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger() * 2;
compressionOut = new ZstdCompressorOutputStream(bufferedOut, zstdCompressionLevel);
mimeTypeRef.set("application/zstd");
break;
case COMPRESSION_FORMAT_BROTLI: {
Brotli4jLoader.ensureAvailability();
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel);
compressionOut = new BrotliOutputStream(bufferedOut, params);
mimeTypeRef.set("application/x-brotli");
break;
}
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
}
} else {
compressionOut = bufferedOut;
compressionIn = switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_LZMA -> new LzmaInputStream(bufferedIn, new Decoder());
case COMPRESSION_FORMAT_XZ_LZMA2 -> new XZInputStream(bufferedIn);
case COMPRESSION_FORMAT_BZIP2 ->
// need this two-arg constructor to support concatenated streams
new BZip2CompressorInputStream(bufferedIn, true);
case COMPRESSION_FORMAT_GZIP -> new GzipCompressorInputStream(bufferedIn, true);
case COMPRESSION_FORMAT_DEFLATE -> new InflaterInputStream(bufferedIn);
case COMPRESSION_FORMAT_SNAPPY -> new SnappyInputStream(bufferedIn);
case COMPRESSION_FORMAT_SNAPPY_HADOOP -> throw new Exception("Cannot decompress snappy-hadoop.");
case COMPRESSION_FORMAT_SNAPPY_FRAMED -> new SnappyFramedInputStream(bufferedIn);
case COMPRESSION_FORMAT_LZ4_FRAMED -> new FramedLZ4CompressorInputStream(bufferedIn, true);
case COMPRESSION_FORMAT_ZSTD -> new ZstdCompressorInputStream(bufferedIn);
case COMPRESSION_FORMAT_BROTLI -> {
Brotli4jLoader.ensureAvailability();
yield new BrotliInputStream(bufferedIn);
}
default -> new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
};
}
} catch (final Exception e) {
closeQuietly(bufferedOut);
throw new IOException(e);
}
try (final InputStream in = compressionIn;
final OutputStream out = compressionOut) {
final byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.flush();
}
});
stopWatch.stop();
final long sizeAfterCompression = flowFile.getSize();
if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) {
flowFile = session.removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key());
if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (filename.toLowerCase().endsWith(fileExtension)) {
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename.substring(0, filename.length() - fileExtension.length()));
}
}
} else {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename + fileExtension);
}
}
logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes",
compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression);
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException e) {
logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", compressionMode.toLowerCase(), flowFile, compressionFormat, e, e);
session.transfer(flowFile, REL_FAILURE);
}
}