in engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java [2253:2490]
public void dispatch() {
if (pipelineMeta == null) { // for preview reasons, no dispatching is done!
return;
}
TransformMeta transformMeta = pipelineMeta.findTransform(transformName);
if (log.isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "BaseTransform.Log.StartingBuffersAllocation"));
}
// How many next transforms are there? 0, 1 or more??
// How many transforms do we send output to?
List<TransformMeta> previousTransforms =
pipelineMeta.findPreviousTransforms(transformMeta, true);
List<TransformMeta> succeedingTransforms = pipelineMeta.findNextTransforms(transformMeta);
int nrInput = previousTransforms.size();
int nrOutput = succeedingTransforms.size();
inputRowSetsLock.writeLock().lock();
outputRowSetsLock.writeLock().lock();
try {
errorRowSet = null;
prevTransforms = new TransformMeta[nrInput];
nextTransforms = new TransformMeta[nrOutput];
currentInputRowSetNr = 0; // we start with input[0]
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG,
"BaseTransform.Log.TransformMeta",
String.valueOf(nrInput),
String.valueOf(nrOutput)));
}
// populate input rowsets.
for (int i = 0; i < previousTransforms.size(); i++) {
prevTransforms[i] = previousTransforms.get(i);
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG,
"BaseTransform.Log.GotPreviousTransform",
transformName,
String.valueOf(i),
prevTransforms[i].getName()));
}
// Looking at the previous transform, you can have either 1 rowset to look at or more then
// one.
int prevCopies = prevTransforms[i].getCopies(this);
int nextCopies = transformMeta.getCopies(this);
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG,
"BaseTransform.Log.InputRowInfo",
String.valueOf(prevCopies),
String.valueOf(nextCopies)));
}
int nrCopies;
int dispatchType;
boolean repartitioning;
if (prevTransforms[i].isPartitioned()) {
repartitioning =
!prevTransforms[i]
.getTransformPartitioningMeta()
.equals(transformMeta.getTransformPartitioningMeta());
} else {
repartitioning = transformMeta.isPartitioned();
}
if (prevCopies == 1 && nextCopies == 1) {
// normal hop
dispatchType = Pipeline.TYPE_DISP_1_1;
nrCopies = 1;
} else if (prevCopies == 1 && nextCopies > 1) {
// one to many hop
dispatchType = Pipeline.TYPE_DISP_1_N;
nrCopies = 1;
} else if (prevCopies > 1 && nextCopies == 1) {
// from many to one hop
dispatchType = Pipeline.TYPE_DISP_N_1;
nrCopies = prevCopies;
} else if (prevCopies == nextCopies && !repartitioning) {
// this may be many-to-many or swim-lanes hop
dispatchType = Pipeline.TYPE_DISP_N_N;
nrCopies = 1;
} else { // > 1!
dispatchType = Pipeline.TYPE_DISP_N_M;
nrCopies = prevCopies;
}
for (int c = 0; c < nrCopies; c++) {
IRowSet rowSet = null;
switch (dispatchType) {
case Pipeline.TYPE_DISP_1_1:
rowSet = pipeline.findRowSet(prevTransforms[i].getName(), 0, transformName, 0);
break;
case Pipeline.TYPE_DISP_1_N:
rowSet =
pipeline.findRowSet(prevTransforms[i].getName(), 0, transformName, getCopy());
break;
case Pipeline.TYPE_DISP_N_1:
rowSet = pipeline.findRowSet(prevTransforms[i].getName(), c, transformName, 0);
break;
case Pipeline.TYPE_DISP_N_N:
rowSet =
pipeline.findRowSet(
prevTransforms[i].getName(), getCopy(), transformName, getCopy());
break;
case Pipeline.TYPE_DISP_N_M:
rowSet =
pipeline.findRowSet(prevTransforms[i].getName(), c, transformName, getCopy());
break;
default:
break;
}
if (rowSet != null) {
inputRowSets.add(rowSet);
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG, "BaseTransform.Log.FoundInputRowset", rowSet.getName()));
}
} else {
if (!prevTransforms[i].isMapping() && !transformMeta.isMapping()) {
logError(BaseMessages.getString(PKG, "BaseTransform.Log.UnableToFindInputRowset"));
setErrors(1);
stopAll();
return;
}
}
}
}
// And now the output part!
for (int i = 0; i < nrOutput; i++) {
nextTransforms[i] = succeedingTransforms.get(i);
int prevCopies = transformMeta.getCopies(this);
int nextCopies = nextTransforms[i].getCopies(this);
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG,
"BaseTransform.Log.OutputRowInfo",
String.valueOf(prevCopies),
String.valueOf(nextCopies)));
}
int nrCopies;
int dispatchType;
boolean repartitioning;
if (transformMeta.isPartitioned()) {
repartitioning =
!transformMeta
.getTransformPartitioningMeta()
.equals(nextTransforms[i].getTransformPartitioningMeta());
} else {
repartitioning = nextTransforms[i].isPartitioned();
}
if (prevCopies == 1 && nextCopies == 1) {
dispatchType = Pipeline.TYPE_DISP_1_1;
nrCopies = 1;
} else if (prevCopies == 1 && nextCopies > 1) {
dispatchType = Pipeline.TYPE_DISP_1_N;
nrCopies = nextCopies;
} else if (prevCopies > 1 && nextCopies == 1) {
dispatchType = Pipeline.TYPE_DISP_N_1;
nrCopies = 1;
} else if (prevCopies == nextCopies && !repartitioning) {
dispatchType = Pipeline.TYPE_DISP_N_N;
nrCopies = 1;
} else { // > 1!
dispatchType = Pipeline.TYPE_DISP_N_M;
nrCopies = nextCopies;
}
for (int c = 0; c < nrCopies; c++) {
IRowSet rowSet = null;
switch (dispatchType) {
case Pipeline.TYPE_DISP_1_1:
rowSet = pipeline.findRowSet(transformName, 0, nextTransforms[i].getName(), 0);
break;
case Pipeline.TYPE_DISP_1_N:
rowSet = pipeline.findRowSet(transformName, 0, nextTransforms[i].getName(), c);
break;
case Pipeline.TYPE_DISP_N_1:
rowSet =
pipeline.findRowSet(transformName, getCopy(), nextTransforms[i].getName(), 0);
break;
case Pipeline.TYPE_DISP_N_N:
rowSet =
pipeline.findRowSet(
transformName, getCopy(), nextTransforms[i].getName(), getCopy());
break;
case Pipeline.TYPE_DISP_N_M:
rowSet =
pipeline.findRowSet(transformName, getCopy(), nextTransforms[i].getName(), c);
break;
default:
break;
}
if (rowSet != null) {
outputRowSets.add(rowSet);
if (log.isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG, "BaseTransform.Log.FoundOutputRowset", rowSet.getName()));
}
} else {
if (!transformMeta.isMapping() && !nextTransforms[i].isMapping()) {
logError(BaseMessages.getString(PKG, "BaseTransform.Log.UnableToFindOutputRowset"));
setErrors(1);
stopAll();
return;
}
}
}
}
} finally {
inputRowSetsLock.writeLock().unlock();
outputRowSetsLock.writeLock().unlock();
}
if (transformMeta.getTargetTransformPartitioningMeta() != null) {
nextTransformPartitioningMeta = transformMeta.getTargetTransformPartitioningMeta();
}
if (log.isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "BaseTransform.Log.FinishedDispatching"));
}
}