in engine/src/main/java/org/apache/hop/pipeline/Pipeline.java [539:1096]
public void prepareExecution() throws HopException {
setPreparing(true);
executionStartDate = new Date();
setRunning(false);
// We create the log channel when we're ready to rock and roll
// Before that it makes little sense. We default to GENERAL there.
// We force the creation of a new log channel ID every time we run this pipeline
//
this.log = new LogChannel(this, parent, isGatheringMetrics(), true);
this.log.setLogLevel(logLevel);
if (this.containerObjectId == null) {
this.containerObjectId = log.getContainerObjectId();
}
if (log.isDebug()) {
log.logDebug(
BaseMessages.getString(
PKG,
"Pipeline.Log.NumberOfTransformsToRun",
String.valueOf(pipelineMeta.nrTransforms()),
String.valueOf(pipelineMeta.nrPipelineHops())));
}
log.snap(Metrics.METRIC_PIPELINE_EXECUTION_START);
log.snap(Metrics.METRIC_PIPELINE_INIT_START);
log.logBasic(
"Executing this pipeline using the Local Pipeline Engine with run configuration '"
+ pipelineRunConfiguration.getName()
+ "'");
ExtensionPointHandler.callExtensionPoint(
log, this, HopExtensionPoint.PipelinePrepareExecution.id, this);
activateParameters(this);
if (pipelineMeta.getName() == null) {
if (pipelineMeta.getFilename() != null) {
log.logBasic(
BaseMessages.getString(
PKG, "Pipeline.Log.ExecutionStartedForFilename", pipelineMeta.getFilename()));
}
} else {
log.logBasic(
BaseMessages.getString(
PKG, "Pipeline.Log.ExecutionStartedForPipeline", pipelineMeta.getName()));
}
if (isSafeModeEnabled()) {
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(PKG, "Pipeline.Log.SafeModeIsEnabled", pipelineMeta.getName()));
}
}
// setInternalHopVariables(this); --> Let's not do this, when running
// without file, for example remote, it spoils the fun
// Keep track of all the row sets and allocated transforms
//
transforms = Collections.synchronizedList(new ArrayList<>());
rowsets = new ArrayList<>();
List<TransformMeta> hopTransforms = pipelineMeta.getPipelineHopTransforms(false);
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG, "Pipeline.Log.FoundDefferentTransforms", String.valueOf(hopTransforms.size())));
log.logDetailed(BaseMessages.getString(PKG, "Pipeline.Log.AllocatingRowsets"));
}
// First allocate all the rowsets required!
// Note that a mapping doesn't receive ANY input or output rowsets...
//
for (int i = 0; i < hopTransforms.size(); i++) {
TransformMeta thisTransform = hopTransforms.get(i);
if (thisTransform.isMapping()) {
continue; // handled and allocated by the mapping transform itself.
}
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.AllocateingRowsetsForTransform",
String.valueOf(i),
thisTransform.getName()));
}
List<TransformMeta> nextTransforms = pipelineMeta.findNextTransforms(thisTransform);
for (TransformMeta nextTransform : nextTransforms) {
// What's the next transform?
if (nextTransform.isMapping()) {
continue; // handled and allocated by the mapping transform itself.
}
// How many times do we start the source transform?
int thisCopies = thisTransform.getCopies(this);
if (thisCopies < 0) {
// This can only happen if a variable is used that didn't resolve to a positive integer
// value
//
throw new HopException(
BaseMessages.getString(
PKG, "Pipeline.Log.TransformCopiesNotCorrectlyDefined", thisTransform.getName()));
}
// How many times do we start the target transform?
int nextCopies = nextTransform.getCopies(this);
// Are we re-partitioning?
boolean repartitioning;
if (thisTransform.isPartitioned()) {
repartitioning =
!thisTransform
.getTransformPartitioningMeta()
.equals(nextTransform.getTransformPartitioningMeta());
} else {
repartitioning = nextTransform.isPartitioned();
}
int nrCopies;
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.copiesInfo",
String.valueOf(thisCopies),
String.valueOf(nextCopies)));
}
int dispatchType;
if (thisCopies == 1 && nextCopies == 1) {
dispatchType = TYPE_DISP_1_1;
nrCopies = 1;
} else if (thisCopies == 1 && nextCopies > 1) {
dispatchType = TYPE_DISP_1_N;
nrCopies = nextCopies;
} else if (thisCopies > 1 && nextCopies == 1) {
dispatchType = TYPE_DISP_N_1;
nrCopies = thisCopies;
} else if (thisCopies == nextCopies && !repartitioning) {
dispatchType = TYPE_DISP_N_N;
nrCopies = nextCopies;
} else {
// > 1!
dispatchType = TYPE_DISP_N_M;
nrCopies = nextCopies;
} // Allocate a rowset for each destination transform
// Allocate the rowsets
//
if (dispatchType != TYPE_DISP_N_M) {
for (int c = 0; c < nrCopies; c++) {
IRowSet rowSet;
switch (pipelineMeta.getPipelineType()) {
case Normal:
// This is a temporary patch until the batching rowset has proven
// to be working in all situations.
// Currently there are stalling problems when dealing with small
// amounts of rows.
//
Boolean batchingRowSet =
ValueMetaBase.convertStringToBoolean(
System.getProperty(Const.HOP_BATCHING_ROWSET));
if (batchingRowSet != null && batchingRowSet.booleanValue()) {
rowSet = new BlockingBatchingRowSet(rowSetSize);
} else {
rowSet = new BlockingRowSet(rowSetSize);
}
break;
case SingleThreaded:
rowSet = new QueueRowSet();
break;
default:
throw new HopException(
"Unhandled pipeline type: " + pipelineMeta.getPipelineType());
}
switch (dispatchType) {
case TYPE_DISP_1_1:
rowSet.setThreadNameFromToCopy(
thisTransform.getName(), 0, nextTransform.getName(), 0);
break;
case TYPE_DISP_1_N:
rowSet.setThreadNameFromToCopy(
thisTransform.getName(), 0, nextTransform.getName(), c);
break;
case TYPE_DISP_N_1:
rowSet.setThreadNameFromToCopy(
thisTransform.getName(), c, nextTransform.getName(), 0);
break;
case TYPE_DISP_N_N:
rowSet.setThreadNameFromToCopy(
thisTransform.getName(), c, nextTransform.getName(), c);
break;
default:
break;
}
rowsets.add(rowSet);
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG, "Pipeline.PipelineAllocatedNewRowset", rowSet.toString()));
}
}
} else {
// For each N source transforms we have M target transforms
//
// From each input transform we go to all output transforms.
// This allows maximum flexibility for re-partitioning,
// distribution...
for (int s = 0; s < thisCopies; s++) {
for (int t = 0; t < nextCopies; t++) {
BlockingRowSet rowSet = new BlockingRowSet(rowSetSize);
rowSet.setThreadNameFromToCopy(
thisTransform.getName(), s, nextTransform.getName(), t);
rowsets.add(rowSet);
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG, "Pipeline.PipelineAllocatedNewRowset", rowSet.toString()));
}
}
}
}
}
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.AllocatedRowsets",
String.valueOf(rowsets.size()),
String.valueOf(i),
thisTransform.getName())
+ " ");
}
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(PKG, "Pipeline.Log.AllocatingTransformsAndTransformData"));
}
// Allocate the transforms & the data...
//
for (TransformMeta transformMeta : hopTransforms) {
String transformid = transformMeta.getTransformPluginId();
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.PipelineIsToAllocateTransform",
transformMeta.getName(),
transformid));
}
// How many copies are launched of this transform?
int nrCopies = transformMeta.getCopies(this);
if (log.isDebug()) {
log.logDebug(
BaseMessages.getString(
PKG, "Pipeline.Log.TransformHasNumberRowCopies", String.valueOf(nrCopies)));
}
// At least run once...
for (int c = 0; c < nrCopies; c++) {
// Make sure we haven't started it yet!
if (!hasTransformStarted(transformMeta.getName(), c)) {
TransformMetaDataCombi combi = new TransformMetaDataCombi();
combi.transformName = transformMeta.getName();
combi.copy = c;
// The meta-data
combi.transformMeta = transformMeta;
combi.meta = transformMeta.getTransform();
// Allocate the transform data
ITransformData data = combi.meta.createTransformData();
combi.data = data;
// Allocate the transform
ITransform transform =
combi.meta.createTransform(transformMeta, data, c, pipelineMeta, this);
// Copy the variables of the pipeline to the transform...
// don't share. Each copy of the transform has its own variables.
//
transform.initializeFrom(this);
// Pass the metadataProvider to the transforms runtime
//
transform.setMetadataProvider(metadataProvider);
// If the transform is partitioned, set the partitioning ID and some other
// things as well...
if (transformMeta.isPartitioned()) {
List<String> partitionIDs =
transformMeta
.getTransformPartitioningMeta()
.getPartitionSchema()
.calculatePartitionIds(this);
if (partitionIDs != null && !partitionIDs.isEmpty()) {
transform.setPartitionId(partitionIDs.get(c)); // Pass the partition ID
// to the transform
}
}
// Save the transform too
combi.transform = transform;
// Pass logging level and metrics gathering down to the transform level.
// /
if (combi.transform instanceof ILoggingObject) {
ILogChannel logChannel = combi.transform.getLogChannel();
logChannel.setLogLevel(logLevel);
logChannel.setGatheringMetrics(log.isGatheringMetrics());
}
// Add to the bunch...
transforms.add(combi);
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.PipelineHasAllocatedANewTransform",
transformMeta.getName(),
String.valueOf(c)));
}
}
}
}
// Now we need to verify if certain rowsets are not meant to be for error
// handling...
// Loop over the transforms and for every transform verify the output rowsets
// If a rowset is going to a target transform in the transforms error handling
// metadata, set it to the errorRowSet.
// The input rowsets are already in place, so the next transform just accepts the
// rows.
// Metadata wise we need to do the same trick in PipelineMeta
//
for (TransformMetaDataCombi combi : transforms) {
if (combi.transformMeta.isDoingErrorHandling()) {
combi.transform.identifyErrorOutput();
}
}
// Now (optionally) write start log record!
// Make sure we synchronize appropriately to avoid duplicate batch IDs.
//
Object syncObject = this;
if (parentWorkflow != null) {
syncObject = parentWorkflow; // parallel execution in a workflow
}
if (parentPipeline != null) {
syncObject = parentPipeline; // multiple sub-pipelines
}
synchronized (syncObject) {
calculateBatchIdAndDateRange();
beginProcessing();
}
// Set the partition-to-rowset mapping
//
for (TransformMetaDataCombi sid : transforms) {
TransformMeta transformMeta = sid.transformMeta;
ITransform baseTransform = sid.transform;
baseTransform.setPartitioned(transformMeta.isPartitioned());
// Now let's take a look at the source and target relation
//
// If this source transform is not partitioned, and the target transform is: it
// means we need to re-partition the incoming data.
// If both transforms are partitioned on the same method and schema, we don't
// need to re-partition
// If both transforms are partitioned on a different method or schema, we need
// to re-partition as well.
// If both transforms are not partitioned, we don't need to re-partition
//
boolean isThisPartitioned = transformMeta.isPartitioned();
PartitionSchema thisPartitionSchema = null;
if (isThisPartitioned) {
thisPartitionSchema = transformMeta.getTransformPartitioningMeta().getPartitionSchema();
}
boolean isNextPartitioned = false;
TransformPartitioningMeta nextTransformPartitioningMeta = null;
PartitionSchema nextPartitionSchema = null;
List<TransformMeta> nextTransforms = pipelineMeta.findNextTransforms(transformMeta);
for (TransformMeta nextTransform : nextTransforms) {
if (nextTransform.isPartitioned()) {
isNextPartitioned = true;
nextTransformPartitioningMeta = nextTransform.getTransformPartitioningMeta();
nextPartitionSchema = nextTransformPartitioningMeta.getPartitionSchema();
}
}
baseTransform.setRepartitioning(TransformPartitioningMeta.PARTITIONING_METHOD_NONE);
// If the next transform is partitioned differently, set re-partitioning, when
// running locally.
//
if ((!isThisPartitioned && isNextPartitioned)
|| (isThisPartitioned
&& isNextPartitioned
&& !thisPartitionSchema.equals(nextPartitionSchema))) {
baseTransform.setRepartitioning(nextTransformPartitioningMeta.getMethodType());
}
// For partitioning to a set of remove transforms (repartitioning from a master
// to a set or remote output transforms)
//
TransformPartitioningMeta targetTransformPartitioningMeta =
baseTransform.getTransformMeta().getTargetTransformPartitioningMeta();
if (targetTransformPartitioningMeta != null) {
baseTransform.setRepartitioning(targetTransformPartitioningMeta.getMethodType());
}
}
setPreparing(false);
setInitializing(true);
// Do a topology sort... Over 150 transform (copies) things might be slowing down too much.
//
if (isSortingTransformsTopologically() && transforms.size() < 150) {
doTopologySortOfTransforms();
}
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG, "Pipeline.Log.InitialisingTransforms", String.valueOf(transforms.size())));
}
TransformInitThread[] initThreads = new TransformInitThread[transforms.size()];
Thread[] threads = new Thread[transforms.size()];
// Initialize all the threads...
//
for (int i = 0; i < transforms.size(); i++) {
final TransformMetaDataCombi sid = transforms.get(i);
// Do the init code in the background!
// Init all transforms at once, but ALL transforms need to finish before we can
// continue properly!
//
initThreads[i] = new TransformInitThread(sid, this, log);
// Put it in a separate thread!
//
threads[i] = new Thread(initThreads[i]);
threads[i].setName(
"init of " + sid.transformName + "." + sid.copy + " (" + threads[i].getName() + ")");
ExtensionPointHandler.callExtensionPoint(
log, this, HopExtensionPoint.TransformBeforeInitialize.id, initThreads[i]);
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
ExtensionPointHandler.callExtensionPoint(
log, this, HopExtensionPoint.TransformAfterInitialize.id, initThreads[i]);
} catch (Exception ex) {
log.logError("Error with init thread: " + ex.getMessage(), ex.getMessage());
log.logError(Const.getStackTracker(ex));
}
}
setInitializing(false);
boolean ok = true;
// All transform are initialized now: see if there was one that didn't do it
// correctly!
//
for (TransformInitThread thread : initThreads) {
TransformMetaDataCombi combi = thread.getCombi();
if (!thread.isOk()) {
log.logError(
BaseMessages.getString(
PKG, "Pipeline.Log.TransformFailedToInit", combi.transformName + "." + combi.copy));
combi.data.setStatus(ComponentExecutionStatus.STATUS_STOPPED);
ok = false;
} else {
combi.data.setStatus(ComponentExecutionStatus.STATUS_IDLE);
if (log.isDetailed()) {
log.logDetailed(
BaseMessages.getString(
PKG,
"Pipeline.Log.TransformInitialized",
combi.transformName + "." + combi.copy));
}
}
}
if (!ok) {
// Halt the other threads as well, signal end-of-the line to the outside
// world...
// Also explicitly call dispose() to clean up resources opened during
// init()
//
for (TransformInitThread initThread : initThreads) {
TransformMetaDataCombi combi = initThread.getCombi();
// Dispose will overwrite the status, but we set it back right after
// this.
combi.transform.dispose();
if (initThread.isOk()) {
combi.data.setStatus(ComponentExecutionStatus.STATUS_HALTED);
} else {
combi.data.setStatus(ComponentExecutionStatus.STATUS_STOPPED);
}
}
// Just for safety, fire the pipeline finished listeners...
try {
fireExecutionFinishedListeners();
} catch (HopException e) {
// listeners produces errors
log.logError(BaseMessages.getString(PKG, "Pipeline.FinishListeners.Exception"));
// we will not pass this exception up to prepareExecuton() entry point.
} finally {
// Flag the pipeline as finished even if exception was thrown
setFinished(true);
}
// Pass along the log during preview. Otherwise it becomes hard to see
// what went wrong.
//
if (preview) {
String logText = HopLogStore.getAppender().getBuffer(getLogChannelId(), true).toString();
throw new HopException(
BaseMessages.getString(PKG, "Pipeline.Log.FailToInitializeAtLeastOneTransform")
+ Const.CR
+ logText);
} else {
throw new HopException(
BaseMessages.getString(PKG, "Pipeline.Log.FailToInitializeAtLeastOneTransform")
+ Const.CR);
}
}
log.snap(Metrics.METRIC_PIPELINE_INIT_STOP);
setReadyToStart(true);
}