in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java [357:541]
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile inputFlowFile = session.get();
if (null == inputFlowFile) {
return;
}
final ArrayList<String> args = new ArrayList<>();
final ArrayList<String> argumentAttributeValue = new ArrayList<>();
final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
final PropertyValue argumentsStrategyPropertyValue = context.getProperty(ARGUMENTS_STRATEGY);
final boolean useDynamicPropertyArguments = argumentsStrategyPropertyValue.isSet() && argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
args.add(executeCommand);
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
final String commandArguments;
if (!useDynamicPropertyArguments) {
commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
if (!StringUtils.isBlank(commandArguments)) {
args.addAll(ArgumentUtils
.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
}
} else {
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.add(entry.getKey());
}
}
propertyDescriptors.sort((p1, p2) -> {
Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(p1.getName());
String indexString1 = null;
while (matcher.find()) {
indexString1 = matcher.group("commandIndex");
}
matcher = COMMAND_ARGUMENT_PATTERN.matcher(p2.getName());
String indexString2 = null;
while (matcher.find()) {
indexString2 = matcher.group("commandIndex");
}
final int index1 = Integer.parseInt(indexString1);
final int index2 = Integer.parseInt(indexString2);
if (index1 > index2) {
return 1;
} else if (index1 < index2) {
return -1;
}
return 0;
});
for (final PropertyDescriptor descriptor : propertyDescriptors) {
String argValue = context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue();
if (descriptor.isSensitive()) {
argumentAttributeValue.add(MASKED_ARGUMENT);
} else {
argumentAttributeValue.add(argValue);
}
args.add(argValue);
}
if (argumentAttributeValue.size() > 0) {
final StringBuilder builder = new StringBuilder();
for (String s : argumentAttributeValue) {
builder.append(s).append("\t");
}
commandArguments = builder.toString().trim();
} else {
commandArguments = "";
}
}
final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
final ProcessBuilder builder = new ProcessBuilder();
// Avoid logging arguments that could contain sensitive values
logger.debug("Executing and waiting for command: {}", executeCommand);
File dir = null;
if (!StringUtils.isBlank(workingDir)) {
dir = new File(workingDir);
if (!dir.exists() && !dir.mkdirs()) {
logger.warn("Failed to create working directory {}, using current working directory {}", workingDir, System.getProperty("user.dir"));
}
}
final Map<String, String> environment = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
environment.put(entry.getKey().getName(), entry.getValue());
}
}
builder.environment().putAll(environment);
builder.command(args);
builder.directory(dir);
builder.redirectInput(Redirect.PIPE);
builder.redirectOutput(Redirect.PIPE);
final File errorOut;
try {
errorOut = File.createTempFile("out", null);
builder.redirectError(errorOut);
} catch (IOException e) {
logger.error("Could not create temporary file for error logging", e);
throw new ProcessException(e);
}
final Process process;
try {
process = builder.start();
} catch (IOException e) {
try {
if (!errorOut.delete()) {
logger.warn("Unable to delete file: {}", errorOut.getAbsolutePath());
}
} catch (SecurityException se) {
logger.warn("Unable to delete file: '{}'", errorOut.getAbsolutePath(), se);
}
logger.error("Could not create external process to run command", e);
throw new ProcessException(e);
}
try (final OutputStream pos = process.getOutputStream();
final InputStream pis = process.getInputStream();
final BufferedInputStream bis = new BufferedInputStream(pis)) {
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger,
attributeName, session, outputFlowFile, process, putToAttribute, attributeSize);
session.read(inputFlowFile, callback);
outputFlowFile = callback.outputFlowFile;
if (putToAttribute) {
outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
}
int exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", executeCommand, exitCode);
Map<String, String> attributes = new HashMap<>();
String stdErr = "";
try (final InputStream in = new BufferedInputStream(new LimitingInputStream(new FileInputStream(errorOut), 4000))) {
stdErr = IOUtils.toString(in, Charset.defaultCharset());
} catch (final Exception e) {
stdErr = "Unknown...could not read Process's Std Error due to " + e.getClass().getName() + ": " + e.getMessage();
}
attributes.put("execution.error", stdErr);
final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
if (exitCode == 0) {
logger.info("Transferring {} to {}", outputFlowFile, outputFlowFileRelationship.getName());
} else {
logger.error("Transferring {} to {}. Executable command {} returned exitCode {} and error message: {}",
outputFlowFile, outputFlowFileRelationship.getName(), executeCommand, exitCode, stdErr);
}
attributes.put("execution.status", Integer.toString(exitCode));
attributes.put("execution.command", executeCommand);
attributes.put("execution.command.args", commandArguments);
if (context.getProperty(MIME_TYPE).isSet() && !putToAttribute) {
attributes.put(CoreAttributes.MIME_TYPE.key(), context.getProperty(MIME_TYPE).getValue());
}
outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {
outputFlowFile = session.penalize(outputFlowFile);
}
// This will transfer the FlowFile that received the stream output to its destined relationship.
// In the event the stream is put to an attribute of the original, it will be transferred here.
session.transfer(outputFlowFile, outputFlowFileRelationship);
if (!putToAttribute) {
logger.info("Transferring {} to original", inputFlowFile);
inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
}
} catch (final IOException e) {
// could not close Process related streams
logger.warn("Problem terminating Process {}", process, e);
} finally {
FileUtils.deleteQuietly(errorOut);
process.destroy(); // last ditch effort to clean up that process.
}
}