public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java [357:541]


    public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile inputFlowFile = session.get();
        if (null == inputFlowFile) {
            return;
        }

        final ArrayList<String> args = new ArrayList<>();
        final ArrayList<String> argumentAttributeValue = new ArrayList<>();
        final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
        final PropertyValue argumentsStrategyPropertyValue = context.getProperty(ARGUMENTS_STRATEGY);
        final boolean useDynamicPropertyArguments = argumentsStrategyPropertyValue.isSet() && argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
        final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
        final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();

        final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
        args.add(executeCommand);
        final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
        final String commandArguments;
        if (!useDynamicPropertyArguments) {
            commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
            if (!StringUtils.isBlank(commandArguments)) {
                args.addAll(ArgumentUtils
                        .splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
            }
        } else {
            List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
            for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
                Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(entry.getKey().getName());
                if (matcher.matches()) {
                    propertyDescriptors.add(entry.getKey());
                }
            }
            propertyDescriptors.sort((p1, p2) -> {
                Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(p1.getName());
                String indexString1 = null;
                while (matcher.find()) {
                    indexString1 = matcher.group("commandIndex");
                }
                matcher = COMMAND_ARGUMENT_PATTERN.matcher(p2.getName());
                String indexString2 = null;
                while (matcher.find()) {
                    indexString2 = matcher.group("commandIndex");
                }
                final int index1 = Integer.parseInt(indexString1);
                final int index2 = Integer.parseInt(indexString2);
                if (index1 > index2) {
                    return 1;
                } else if (index1 < index2) {
                    return -1;
                }
                return 0;
            });

            for (final PropertyDescriptor descriptor : propertyDescriptors) {
                String argValue = context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue();
                if (descriptor.isSensitive()) {
                    argumentAttributeValue.add(MASKED_ARGUMENT);
                } else {
                    argumentAttributeValue.add(argValue);
                }
                args.add(argValue);

            }
            if (argumentAttributeValue.size() > 0) {
                final StringBuilder builder = new StringBuilder();
                for (String s : argumentAttributeValue) {
                    builder.append(s).append("\t");
                }
                commandArguments = builder.toString().trim();
            } else {
                commandArguments = "";
            }
        }
        final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();

        final ProcessBuilder builder = new ProcessBuilder();

        // Avoid logging arguments that could contain sensitive values
        logger.debug("Executing and waiting for command: {}", executeCommand);
        File dir = null;
        if (!StringUtils.isBlank(workingDir)) {
            dir = new File(workingDir);
            if (!dir.exists() && !dir.mkdirs()) {
                logger.warn("Failed to create working directory {}, using current working directory {}", workingDir, System.getProperty("user.dir"));
            }
        }
        final Map<String, String> environment = new HashMap<>();
        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
            if (entry.getKey().isDynamic()) {
                environment.put(entry.getKey().getName(), entry.getValue());
            }
        }
        builder.environment().putAll(environment);
        builder.command(args);
        builder.directory(dir);
        builder.redirectInput(Redirect.PIPE);
        builder.redirectOutput(Redirect.PIPE);
        final File errorOut;
        try {
            errorOut = File.createTempFile("out", null);
            builder.redirectError(errorOut);
        } catch (IOException e) {
            logger.error("Could not create temporary file for error logging", e);
            throw new ProcessException(e);
        }

        final Process process;
        try {
            process = builder.start();
        } catch (IOException e) {
            try {
                if (!errorOut.delete()) {
                    logger.warn("Unable to delete file: {}", errorOut.getAbsolutePath());
                }
            } catch (SecurityException se) {
                logger.warn("Unable to delete file: '{}'", errorOut.getAbsolutePath(), se);
            }
            logger.error("Could not create external process to run command", e);
            throw new ProcessException(e);
        }
        try (final OutputStream pos = process.getOutputStream();
             final InputStream pis = process.getInputStream();
             final BufferedInputStream bis = new BufferedInputStream(pis)) {
            final BufferedOutputStream bos = new BufferedOutputStream(pos);
            FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);

            ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger,
                    attributeName, session, outputFlowFile, process, putToAttribute, attributeSize);
            session.read(inputFlowFile, callback);

            outputFlowFile = callback.outputFlowFile;
            if (putToAttribute) {
                outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
            }

            int exitCode = callback.exitCode;
            logger.debug("Execution complete for command: {}.  Exited with code: {}", executeCommand, exitCode);

            Map<String, String> attributes = new HashMap<>();

            String stdErr = "";
            try (final InputStream in = new BufferedInputStream(new LimitingInputStream(new FileInputStream(errorOut), 4000))) {
                stdErr = IOUtils.toString(in, Charset.defaultCharset());
            } catch (final Exception e) {
                stdErr = "Unknown...could not read Process's Std Error due to " + e.getClass().getName() + ": " + e.getMessage();
            }
            attributes.put("execution.error", stdErr);

            final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
            if (exitCode == 0) {
                logger.info("Transferring {} to {}", outputFlowFile, outputFlowFileRelationship.getName());
            } else {
                logger.error("Transferring {} to {}. Executable command {} returned exitCode {} and error message: {}",
                        outputFlowFile, outputFlowFileRelationship.getName(), executeCommand, exitCode, stdErr);
            }

            attributes.put("execution.status", Integer.toString(exitCode));
            attributes.put("execution.command", executeCommand);
            attributes.put("execution.command.args", commandArguments);
            if (context.getProperty(MIME_TYPE).isSet() && !putToAttribute) {
                attributes.put(CoreAttributes.MIME_TYPE.key(), context.getProperty(MIME_TYPE).getValue());
            }
            outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);

            if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {
                outputFlowFile = session.penalize(outputFlowFile);
            }
            // This will transfer the FlowFile that received the stream output to its destined relationship.
            // In the event the stream is put to an attribute of the original, it will be transferred here.
            session.transfer(outputFlowFile, outputFlowFileRelationship);

            if (!putToAttribute) {
                logger.info("Transferring {} to original", inputFlowFile);
                inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
                session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
            }

        } catch (final IOException e) {
            // could not close Process related streams
            logger.warn("Problem terminating Process {}", process, e);
        } finally {
            FileUtils.deleteQuietly(errorOut);
            process.destroy(); // last ditch effort to clean up that process.
        }
    }