public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java [255:436]


    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();
        final long sizeBeforeCompression = flowFile.getSize();
        final String compressionMode = context.getProperty(MODE).getValue();

        String compressionFormatValue = context.getProperty(COMPRESSION_FORMAT).getValue();
        if (compressionFormatValue.equals(COMPRESSION_FORMAT_ATTRIBUTE)) {
            final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
            if (mimeType == null) {
                logger.error("No {} attribute exists for {}; routing to failure", CoreAttributes.MIME_TYPE.key(), flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }

            compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType);
            if (compressionFormatValue == null) {
                logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing", flowFile, mimeType);
                session.transfer(flowFile, REL_SUCCESS);
                return;
            }
        }

        final String compressionFormat = compressionFormatValue;
        final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
        final StopWatch stopWatch = new StopWatch(true);

        final String fileExtension = switch (compressionFormat.toLowerCase()) {
            case COMPRESSION_FORMAT_GZIP -> ".gz";
            case COMPRESSION_FORMAT_DEFLATE -> ".zlib";
            case COMPRESSION_FORMAT_LZMA -> ".lzma";
            case COMPRESSION_FORMAT_XZ_LZMA2 -> ".xz";
            case COMPRESSION_FORMAT_BZIP2 -> ".bz2";
            case COMPRESSION_FORMAT_SNAPPY -> ".snappy";
            case COMPRESSION_FORMAT_SNAPPY_HADOOP -> ".snappy";
            case COMPRESSION_FORMAT_SNAPPY_FRAMED -> ".sz";
            case COMPRESSION_FORMAT_LZ4_FRAMED -> ".lz4";
            case COMPRESSION_FORMAT_ZSTD -> ".zst";
            case COMPRESSION_FORMAT_BROTLI -> ".br";
            default -> "";
        };

        try {
            flowFile = session.write(flowFile, (rawIn, rawOut) -> {
                final OutputStream compressionOut;
                final InputStream compressionIn;

                final OutputStream bufferedOut = new BufferedOutputStream(rawOut, 65536);
                final InputStream bufferedIn = new BufferedInputStream(rawIn, 65536);

                try {
                    if (MODE_COMPRESS.equalsIgnoreCase(compressionMode)) {
                        compressionIn = bufferedIn;

                        switch (compressionFormat.toLowerCase()) {
                            case COMPRESSION_FORMAT_GZIP: {
                                int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
                                compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
                                mimeTypeRef.set("application/gzip");
                                break;
                            }
                            case COMPRESSION_FORMAT_DEFLATE: {
                                final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
                                compressionOut = new DeflaterOutputStream(bufferedOut, new Deflater(compressionLevel));
                                mimeTypeRef.set("application/gzip");
                                break;
                            }
                            case COMPRESSION_FORMAT_LZMA:
                                compressionOut = new LzmaOutputStream.Builder(bufferedOut).build();
                                mimeTypeRef.set("application/x-lzma");
                                break;
                            case COMPRESSION_FORMAT_XZ_LZMA2:
                                final int xzCompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
                                compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options(xzCompressionLevel));
                                mimeTypeRef.set("application/x-xz");
                                break;
                            case COMPRESSION_FORMAT_SNAPPY:
                                compressionOut = new SnappyOutputStream(bufferedOut);
                                mimeTypeRef.set("application/x-snappy");
                                break;
                            case COMPRESSION_FORMAT_SNAPPY_HADOOP:
                                compressionOut = new SnappyHadoopCompatibleOutputStream(bufferedOut);
                                mimeTypeRef.set("application/x-snappy-hadoop");
                                break;
                            case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                                compressionOut = new SnappyFramedOutputStream(bufferedOut);
                                mimeTypeRef.set("application/x-snappy-framed");
                                break;
                            case COMPRESSION_FORMAT_LZ4_FRAMED:
                                mimeTypeRef.set("application/x-lz4-framed");
                                compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
                                break;
                            case COMPRESSION_FORMAT_ZSTD:
                                final int zstdCompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger() * 2;
                                compressionOut = new ZstdCompressorOutputStream(bufferedOut, zstdCompressionLevel);
                                mimeTypeRef.set("application/zstd");
                                break;
                            case COMPRESSION_FORMAT_BROTLI: {
                                Brotli4jLoader.ensureAvailability();
                                final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
                                Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel);
                                compressionOut = new BrotliOutputStream(bufferedOut, params);
                                mimeTypeRef.set("application/x-brotli");
                                break;
                            }
                            case COMPRESSION_FORMAT_BZIP2:
                            default:
                                mimeTypeRef.set("application/x-bzip2");
                                compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
                                break;
                        }
                    } else {
                        compressionOut = bufferedOut;
                        compressionIn = switch (compressionFormat.toLowerCase()) {
                            case COMPRESSION_FORMAT_LZMA -> new LzmaInputStream(bufferedIn, new Decoder());
                            case COMPRESSION_FORMAT_XZ_LZMA2 -> new XZInputStream(bufferedIn);
                            case COMPRESSION_FORMAT_BZIP2 ->
                                // need this two-arg constructor to support concatenated streams
                                new BZip2CompressorInputStream(bufferedIn, true);
                            case COMPRESSION_FORMAT_GZIP -> new GzipCompressorInputStream(bufferedIn, true);
                            case COMPRESSION_FORMAT_DEFLATE -> new InflaterInputStream(bufferedIn);
                            case COMPRESSION_FORMAT_SNAPPY -> new SnappyInputStream(bufferedIn);
                            case COMPRESSION_FORMAT_SNAPPY_HADOOP -> throw new Exception("Cannot decompress snappy-hadoop.");
                            case COMPRESSION_FORMAT_SNAPPY_FRAMED -> new SnappyFramedInputStream(bufferedIn);
                            case COMPRESSION_FORMAT_LZ4_FRAMED -> new FramedLZ4CompressorInputStream(bufferedIn, true);
                            case COMPRESSION_FORMAT_ZSTD -> new ZstdCompressorInputStream(bufferedIn);
                            case COMPRESSION_FORMAT_BROTLI -> {
                                Brotli4jLoader.ensureAvailability();
                                yield new BrotliInputStream(bufferedIn);
                            }
                            default -> new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
                        };
                    }
                } catch (final Exception e) {
                    closeQuietly(bufferedOut);
                    throw new IOException(e);
                }

                try (final InputStream in = compressionIn;
                    final OutputStream out = compressionOut) {
                    final byte[] buffer = new byte[8192];
                    int len;
                    while ((len = in.read(buffer)) > 0) {
                        out.write(buffer, 0, len);
                    }
                    out.flush();
                }
            });
            stopWatch.stop();

            final long sizeAfterCompression = flowFile.getSize();
            if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) {
                flowFile = session.removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key());

                if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
                    final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
                    if (filename.toLowerCase().endsWith(fileExtension)) {
                        flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename.substring(0, filename.length() - fileExtension.length()));
                    }
                }
            } else {
                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());

                if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
                    final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
                    flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename + fileExtension);
                }
            }

            logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes",
                compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression);
            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
        } catch (final ProcessException e) {
            logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", compressionMode.toLowerCase(), flowFile, compressionFormat, e, e);
            session.transfer(flowFile, REL_FAILURE);
        }
    }