in plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/fileinput/TextFileInput.java [1040:1415]
public boolean processRow() throws HopException {
Object[] r = null;
boolean retval = true;
boolean putrow = false;
if (first) { // we just got started
first = false;
data.outputRowMeta = new RowMeta();
IRowMeta[] infoTransform = null;
if (meta.isAcceptingFilenames()) {
// Read the files from the specified input stream...
//
data.getFiles().getFiles().clear();
int idx = -1;
data.rowSet = findInputRowSet(meta.getAcceptingTransformName());
Object[] fileRow = getRowFrom(data.rowSet);
while (fileRow != null) {
IRowMeta prevInfoFields = data.rowSet.getRowMeta();
if (idx < 0) {
if (meta.isPassingThruFields()) {
data.passThruFields = new HashMap<>();
infoTransform = new IRowMeta[] {prevInfoFields};
data.nrPassThruFields = prevInfoFields.size();
}
idx = prevInfoFields.indexOfValue(meta.getAcceptingField());
if (idx < 0) {
logError(
BaseMessages.getString(
PKG,
"TextFileInput.Log.Error.UnableToFindFilenameField",
meta.getAcceptingField()));
setErrors(getErrors() + 1);
stopAll();
return false;
}
}
String fileValue = prevInfoFields.getString(fileRow, idx);
try {
FileObject fileObject = HopVfs.getFileObject(fileValue, variables);
data.getFiles().addFile(fileObject);
if (meta.isPassingThruFields()) {
data.passThruFields.put(fileObject, fileRow);
}
} catch (HopFileException e) {
logError(
BaseMessages.getString(
PKG, "TextFileInput.Log.Error.UnableToCreateFileObject", fileValue),
e);
}
// Grab another row
fileRow = getRowFrom(data.rowSet);
}
if (data.getFiles().nrOfFiles() == 0) {
if (isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "TextFileInput.Log.Error.NoFilesSpecified"));
}
setOutputDone();
return false;
}
}
// // get the metadata populated. Simple and easy.
meta.getFields(
data.outputRowMeta, getTransformName(), infoTransform, null, this, metadataProvider);
// Create convert meta-data objects that will contain Date & Number formatters
//
data.convertRowMeta = data.outputRowMeta.cloneToType(IValueMeta.TYPE_STRING);
handleMissingFiles();
// Open the first file & read the required rows in the buffer, stop
// if it fails and not set to skip bad files...
if (!openNextFile()) {
if (failAfterBadFile(null)) {
closeLastFile();
setOutputDone();
return false;
}
}
// Count the number of repeat fields...
for (int i = 0; i < meta.getInputFields().length; i++) {
if (meta.getInputFields()[i].isRepeated()) {
data.nr_repeats++;
}
}
} else {
if (!data.doneReading) {
int repeats = 1;
if (meta.isLineWrapped()) {
repeats = meta.getNrWraps() > 0 ? meta.getNrWraps() : repeats;
}
if (!data.doneWithHeader && data.headerLinesRead == 0) {
// We are just starting to read header lines, read them all
repeats += meta.getNrHeaderLines() + 1;
}
// Read a number of lines...
for (int i = 0; i < repeats && !data.doneReading; i++) {
if (!tryToReadLine(true)) {
repeats++;
}
}
}
}
/*
* If the buffer is empty: open the next file. (if nothing in there, open the next, etc.)
*/
while (data.lineBuffer.isEmpty()) {
if (!openNextFile()) {
// Open fails: done processing unless set to skip bad files
if (failAfterBadFile(null)) {
closeLastFile();
setOutputDone(); // signal end to receiver(s)
return false;
} // else will continue until can open
}
}
/*
* Take the first line available in the buffer & remove the line from the buffer
*/
TextFileLine textLine = data.lineBuffer.get(0);
incrementLinesInput();
lineNumberInFile++;
data.lineBuffer.remove(0);
if (meta.isLayoutPaged()) {
/*
* Different rules apply: on each page: a header a number of data lines a footer
*/
if (!data.doneWithHeader && data.pageLinesRead == 0) {
// We are reading header lines
if (isRowLevel()) {
logRowlevel("P-HEADER (" + data.headerLinesRead + ") : " + textLine.line);
}
data.headerLinesRead++;
if (data.headerLinesRead >= meta.getNrHeaderLines()) {
data.doneWithHeader = true;
}
} else {
// data lines or footer on a page
if (data.pageLinesRead < meta.getNrLinesPerPage()) {
// See if we are dealing with wrapped lines:
if (meta.isLineWrapped()) {
for (int i = 0; i < meta.getNrWraps(); i++) {
String extra = "";
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.get(0).line;
data.lineBuffer.remove(0);
}
textLine.line += extra;
}
}
if (isRowLevel()) {
logRowlevel("P-DATA: " + textLine.line);
}
// Read a normal line on a page of data.
data.pageLinesRead++;
data.lineInFile++;
long useNumber = meta.isRowNumberByFile() ? data.lineInFile : getLinesWritten() + 1;
r =
convertLineToRow(
getLogChannel(),
textLine,
meta,
data.currentPassThruFieldsRow,
data.nrPassThruFields,
data.outputRowMeta,
data.convertRowMeta,
data.filename,
useNumber,
data.separator,
data.enclosure,
data.escapeCharacter,
data.dataErrorLineHandler,
data.addShortFilename,
data.addExtension,
data.addPath,
data.addSize,
data.addIsHidden,
data.addLastModificationDate,
data.addUri,
data.addRootUri,
data.shortFilename,
data.path,
data.hidden,
data.lastModificationDateTime,
data.uriName,
data.rootUriName,
data.extension,
data.size);
if (r != null) {
putrow = true;
}
// Possible fix for bug - paged layout header and line count off by 1
// We need to reset these BEFORE the next header line is read, so that it
// is treated as a header ... obviously, only if there is no footer, and we are
// done reading data.
if (!meta.hasFooter() && (data.pageLinesRead == meta.getNrLinesPerPage())) {
/*
* OK, we are done reading the footer lines, start again on 'next page' with the header
*/
data.doneWithHeader = false;
data.headerLinesRead = 0;
data.pageLinesRead = 0;
data.footerLinesRead = 0;
if (isRowLevel()) {
logRowlevel("RESTART PAGE");
}
}
} else {
// done reading the data lines, skip the footer lines
if (meta.hasFooter() && data.footerLinesRead < meta.getNrFooterLines()) {
if (isRowLevel()) {
logRowlevel("P-FOOTER: " + textLine.line);
}
data.footerLinesRead++;
}
if (!meta.hasFooter() || data.footerLinesRead >= meta.getNrFooterLines()) {
/*
* OK, we are done reading the footer lines, start again on 'next page' with the header
*/
data.doneWithHeader = false;
data.headerLinesRead = 0;
data.pageLinesRead = 0;
data.footerLinesRead = 0;
if (isRowLevel()) {
logRowlevel("RESTART PAGE");
}
}
}
}
} else {
// A normal data line, can also be a header or a footer line
if (!data.doneWithHeader) { // We are reading header lines
data.headerLinesRead++;
if (data.headerLinesRead >= meta.getNrHeaderLines()) {
data.doneWithHeader = true;
}
} else {
/*
* IF we are done reading and we have a footer AND the number of lines in the buffer is smaller then the number
* of footer lines THEN we can remove the remaining rows from the buffer: they are all footer rows.
*/
if (data.doneReading
&& meta.hasFooter()
&& data.lineBuffer.size() < meta.getNrFooterLines()) {
data.lineBuffer.clear();
} else {
// Not yet a footer line: it's a normal data line.
// See if we are dealing with wrapped lines:
if (meta.isLineWrapped()) {
for (int i = 0; i < meta.getNrWraps(); i++) {
String extra = "";
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.get(0).line;
data.lineBuffer.remove(0);
} else {
tryToReadLine(true);
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.remove(0).line;
}
}
textLine.line += extra;
}
}
if (data.filePlayList.isProcessingNeeded(
textLine.file, textLine.lineNumber, AbstractFileErrorHandler.NO_PARTS)) {
data.lineInFile++;
long useNumber = meta.isRowNumberByFile() ? data.lineInFile : getLinesWritten() + 1;
r =
convertLineToRow(
getLogChannel(),
textLine,
meta,
data.currentPassThruFieldsRow,
data.nrPassThruFields,
data.outputRowMeta,
data.convertRowMeta,
data.filename,
useNumber,
data.separator,
data.enclosure,
data.escapeCharacter,
data.dataErrorLineHandler,
data.addShortFilename,
data.addExtension,
data.addPath,
data.addSize,
data.addIsHidden,
data.addLastModificationDate,
data.addUri,
data.addRootUri,
data.shortFilename,
data.path,
data.hidden,
data.lastModificationDateTime,
data.uriName,
data.rootUriName,
data.extension,
data.size);
if (r != null) {
if (isRowLevel()) {
logRowlevel("Found data row: " + data.outputRowMeta.getString(r));
}
putrow = true;
}
} else {
putrow = false;
}
}
}
}
if (putrow && r != null) {
// See if the previous values need to be repeated!
if (data.nr_repeats > 0) {
if (data.previousRow == null) { // First invocation...
data.previousRow = data.outputRowMeta.cloneRow(r);
} else {
for (int i = 0; i < meta.getInputFields().length; i++) {
if (meta.getInputFields()[i].isRepeated()) {
if (r[i] == null) {
// if it is empty: take the previous value!
r[i] = data.previousRow[i];
} else {
// not empty: change the previous_row entry!
data.previousRow[i] = r[i];
}
}
}
}
}
if (isRowLevel()) {
logRowlevel("Putting row: " + data.outputRowMeta.getString(r));
}
putRow(data.outputRowMeta, r);
if (getLinesInput() >= meta.getRowLimit() && meta.getRowLimit() > 0) {
closeLastFile();
setOutputDone(); // signal end to receiver(s)
return false;
}
}
if (checkFeedback(getLinesInput())) {
if (isBasic()) {
logBasic("linenr " + getLinesInput());
}
}
return retval;
}