in plugins/actions/pipeline/src/main/java/org/apache/hop/workflow/actions/pipeline/ActionPipeline.java [287:612]
public Result execute(Result result, int nr) throws HopException {
result.setEntryNr(nr);
LogChannelFileWriter logChannelFileWriter = null;
LogLevel pipelineLogLevel = parentWorkflow.getLogLevel();
String realLogFilename = "";
if (setLogfile) {
pipelineLogLevel = logFileLevel;
realLogFilename = resolve(getLogFilename());
// We need to check here the log filename
// if we do not have one, we must fail
if (Utils.isEmpty(realLogFilename)) {
logError(BaseMessages.getString(PKG, "ActionPipeline.Exception.LogFilenameMissing"));
result.setNrErrors(1);
result.setResult(false);
return result;
}
// create parent folder?
if (!FileUtil.createParentFolder(
PKG, realLogFilename, createParentFolder, this.getLogChannel())) {
result.setNrErrors(1);
result.setResult(false);
return result;
}
try {
logChannelFileWriter =
new LogChannelFileWriter(
this.getLogChannelId(), HopVfs.getFileObject(realLogFilename), setAppendLogfile);
logChannelFileWriter.startLogging();
} catch (HopException e) {
logError(
BaseMessages.getString(
PKG, "ActionPipeline.Error.UnableOpenAppender", realLogFilename, e.toString()));
logError(Const.getStackTracker(e));
result.setNrErrors(1);
result.setResult(false);
return result;
}
}
logDetailed(
BaseMessages.getString(PKG, "ActionPipeline.Log.OpeningPipeline", resolve(getFilename())));
// Load the pipeline only once for the complete loop!
// Throws an exception if it was not possible to load the pipeline, for example if the XML file
// doesn't exist.
// Log the stack trace and return an error condition from this
//
PipelineMeta pipelineMeta = null;
try {
pipelineMeta = getPipelineMeta(getMetadataProvider(), this);
} catch (HopException e) {
logError(
BaseMessages.getString(
PKG,
"ActionPipeline.Exception.UnableToRunWorkflow",
parentWorkflowMeta.getName(),
getName(),
StringUtils.trim(e.getMessage())),
e);
result.setNrErrors(1);
result.setResult(false);
return result;
}
int iteration = 0;
RowMetaAndData resultRow = null;
boolean first = true;
List<RowMetaAndData> rows = new ArrayList<>(result.getRows());
while ((first && !execPerRow)
|| (execPerRow && rows != null && iteration < rows.size() && result.getNrErrors() == 0)
&& !parentWorkflow.isStopped()) {
// Clear the result rows of the result
// Otherwise we double the amount of rows every iteration in the simple cases.
//
if (execPerRow) {
result.getRows().clear();
}
if (rows != null && execPerRow) {
resultRow = rows.get(iteration);
} else {
resultRow = null;
}
INamedParameters namedParam = new NamedParameters();
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// We have a parameter
//
namedParam.addParameterDefinition(parameter.getName(), "", "Action runtime");
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
// There is no field name specified.
//
String value = Const.NVL(resolve(parameter.getValue()), "");
namedParam.setParameterValue(parameter.getName(), value);
} else {
// something filled in, in the field column...
//
String value = "";
if (resultRow != null) {
value = resultRow.getString(parameter.getField(), "");
}
namedParam.setParameterValue(parameter.getName(), value);
}
}
}
first = false;
Result previousResult = result;
try {
if (isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG,
"ActionPipeline.StartingPipeline",
getFilename(),
getName(),
getDescription()));
}
if (clearResultRows) {
previousResult.setRows(new ArrayList<>());
}
if (clearResultFiles) {
previousResult.getResultFiles().clear();
}
/*
* Set one or more "result" rows on the pipeline...
*/
if (execPerRow) {
// Execute for each input row
// Just pass a single row
List<RowMetaAndData> newList = new ArrayList<>();
newList.add(resultRow);
// This previous result rows list can be either empty or not.
// Depending on the checkbox "clear result rows"
// In this case, it would execute the pipeline with one extra row each time
// Can't figure out a real use-case for it, but hey, who am I to decide that, right?
// :-)
//
previousResult.getRows().addAll(newList);
if (paramsFromPrevious) { // Copy the input the parameters
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// We have a parameter
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
namedParam.setParameterValue(
parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
} else {
String fieldValue = "";
if (resultRow != null) {
fieldValue = resultRow.getString(parameter.getField(), "");
}
// Get the value from the input stream
namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
}
}
}
}
} else {
if (paramsFromPrevious) {
// Copy the input the parameters
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// We have a parameter
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
namedParam.setParameterValue(
parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
} else {
String fieldValue = "";
if (resultRow != null) {
fieldValue = resultRow.getString(parameter.getField(), "");
}
// Get the value from the input stream
namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
}
}
}
}
}
// Handle the parameters...
//
String[] parameterNames = pipelineMeta.listParameters();
prepareFieldNamesParameters(parameterDefinition.getParameters(), namedParam, this);
if (StringUtils.isEmpty(runConfiguration)) {
throw new HopException(
"This action needs a run configuration to use to execute the specified pipeline");
}
runConfiguration = resolve(runConfiguration);
logBasic(BaseMessages.getString(PKG, "ActionPipeline.RunConfig.Message", runConfiguration));
// Create the pipeline from meta-data
//
pipeline =
PipelineEngineFactory.createPipelineEngine(
this, runConfiguration, getMetadataProvider(), pipelineMeta);
pipeline.setParent(this);
// set the parent workflow on the pipeline, variables are taken from here...
//
pipeline.setParentWorkflow(parentWorkflow);
pipeline.setParentVariables(parentWorkflow);
pipeline.setLogLevel(pipelineLogLevel);
pipeline.setPreviousResult(previousResult);
// inject the metadataProvider
pipeline.setMetadataProvider(getMetadataProvider());
// Handle parameters...
//
pipeline.initializeFrom(null);
pipeline.copyParametersFromDefinitions(pipelineMeta);
// Pass the parameter values and activate...
//
TransformWithMappingMeta.activateParams(
pipeline,
pipeline,
this,
parameterNames,
parameterDefinition.getNames(),
parameterDefinition.getValues(),
parameterDefinition.isPassingAllParameters());
// First get the root workflow
//
IWorkflowEngine<WorkflowMeta> rootWorkflow = parentWorkflow;
while (rootWorkflow.getParentWorkflow() != null) {
rootWorkflow = rootWorkflow.getParentWorkflow();
}
try {
// Start execution...
//
pipeline.execute();
// Wait until we're done with this pipeline
//
if (isWaitingToFinish()) {
pipeline.waitUntilFinished();
if (parentWorkflow.isStopped() || pipeline.getErrors() != 0) {
pipeline.stopAll();
result.setNrErrors(1);
}
updateResult(result);
}
if (setLogfile) {
ResultFile resultFile =
new ResultFile(
ResultFile.FILE_TYPE_LOG,
HopVfs.getFileObject(realLogFilename),
parentWorkflow.getWorkflowName(),
toString());
result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
}
} catch (HopException e) {
logError(BaseMessages.getString(PKG, "ActionPipeline.Error.UnablePrepareExec"), e);
result.setNrErrors(1);
}
} catch (Exception e) {
logError(
BaseMessages.getString(PKG, "ActionPipeline.ErrorUnableOpenPipeline", e.getMessage()));
logError(Const.getStackTracker(e));
result.setNrErrors(1);
}
iteration++;
}
if (setLogfile) {
if (logChannelFileWriter != null) {
logChannelFileWriter.stopLogging();
ResultFile resultFile =
new ResultFile(
ResultFile.FILE_TYPE_LOG,
logChannelFileWriter.getLogFile(),
parentWorkflow.getWorkflowName(),
getName());
result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
// See if anything went wrong during file writing...
//
if (logChannelFileWriter.getException() != null) {
logError("Unable to open log file [" + getLogFilename() + "] : ");
logError(Const.getStackTracker(logChannelFileWriter.getException()));
result.setNrErrors(1);
result.setResult(false);
return result;
}
}
}
if (result.getNrErrors() == 0) {
result.setResult(true);
} else {
result.setResult(false);
}
return result;
}