in plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/fileinput/text/TextFileInputReader.java [157:415]
public boolean readRow() throws HopException {
Object[] r = null;
boolean retval = true;
boolean putrow = false;
if (!data.doneReading) {
int repeats = 1;
if (meta.content.lineWrapped) {
repeats = meta.content.nrWraps > 0 ? meta.content.nrWraps : repeats;
}
if (!data.doneWithHeader && data.headerLinesRead == 0) {
// We are just starting to read header lines, read them all
repeats += meta.content.nrHeaderLines + 1;
}
// Read a number of lines...
for (int i = 0; i < repeats && !data.doneReading; i++) {
if (!tryToReadLine(true)) {
repeats++;
}
}
}
if (data.lineBuffer.isEmpty()) {
return false;
}
/*
* Take the first line available in the buffer & remove the line from the buffer
*/
TextFileLine textLine = data.lineBuffer.get(0);
transform.incrementLinesInput();
data.lineBuffer.remove(0);
if (meta.content.layoutPaged) {
/*
* Different rules apply: on each page: a header a number of data lines a footer
*/
if (!data.doneWithHeader && data.pageLinesRead == 0) { // We are reading header lines
if (log.isRowLevel()) {
log.logRowlevel("P-HEADER (" + data.headerLinesRead + ") : " + textLine.line);
}
data.headerLinesRead++;
if (data.headerLinesRead >= meta.content.nrHeaderLines) {
data.doneWithHeader = true;
}
} else {
// data lines or footer on a page
if (data.pageLinesRead < meta.content.nrLinesPerPage) {
// See if we are dealing with wrapped lines:
if (meta.content.lineWrapped) {
for (int i = 0; i < meta.content.nrWraps; i++) {
String extra = "";
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.get(0).line;
data.lineBuffer.remove(0);
}
textLine.line += extra;
}
}
if (log.isRowLevel()) {
log.logRowlevel("P-DATA: " + textLine.line);
}
// Read a normal line on a page of data.
data.pageLinesRead++;
lineInFile++;
long useNumber =
meta.content.rowNumberByFile ? lineInFile : transform.getLinesWritten() + 1;
r =
TextFileInputUtils.convertLineToRow(
log,
textLine,
meta,
data.currentPassThruFieldsRow,
data.nrPassThruFields,
data.outputRowMeta,
data.convertRowMeta,
data.filename,
useNumber,
data.separator,
data.enclosure,
data.escapeCharacter,
data.dataErrorLineHandler,
meta.additionalOutputFields,
data.shortFilename,
data.path,
data.hidden,
data.lastModificationDateTime,
data.uriName,
data.rootUriName,
data.extension,
data.size);
if (r != null) {
putrow = true;
}
// Possible fix for paged layout header and line count off by 1
// We need to reset these BEFORE the next header line is read, so that it
// is treated as a header ... obviously, only if there is no footer, and we are
// done reading data.
if (!meta.content.footer && (data.pageLinesRead == meta.content.nrLinesPerPage)) {
/*
* OK, we are done reading the footer lines, start again on 'next page' with the header
*/
data.doneWithHeader = false;
data.headerLinesRead = 0;
data.pageLinesRead = 0;
data.footerLinesRead = 0;
if (log.isRowLevel()) {
log.logRowlevel("RESTART PAGE");
}
}
} else {
// done reading the data lines, skip the footer lines
if (meta.content.footer && data.footerLinesRead < meta.content.nrFooterLines) {
if (log.isRowLevel()) {
log.logRowlevel("P-FOOTER: " + textLine.line);
}
data.footerLinesRead++;
}
if (!meta.content.footer || data.footerLinesRead >= meta.content.nrFooterLines) {
/*
* OK, we are done reading the footer lines, start again on 'next page' with the header
*/
data.doneWithHeader = false;
data.headerLinesRead = 0;
data.pageLinesRead = 0;
data.footerLinesRead = 0;
if (log.isRowLevel()) {
log.logRowlevel("RESTART PAGE");
}
}
}
}
} else {
// A normal data line, can also be a header or a footer line
if (!data.doneWithHeader) { // We are reading header lines
data.headerLinesRead++;
if (data.headerLinesRead >= meta.content.nrHeaderLines) {
data.doneWithHeader = true;
}
} else {
/*
* IF we are done reading and we have a footer AND the number of lines in the buffer is smaller then the number
* of footer lines THEN we can remove the remaining rows from the buffer: they are all footer rows.
*/
if (data.doneReading
&& meta.content.footer
&& data.lineBuffer.size() < meta.content.nrFooterLines) {
data.lineBuffer.clear();
} else {
// Not yet a footer line: it's a normal data line.
// See if we are dealing with wrapped lines:
if (meta.content.lineWrapped) {
for (int i = 0; i < meta.content.nrWraps; i++) {
String extra = "";
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.get(0).line;
data.lineBuffer.remove(0);
} else {
tryToReadLine(true);
if (!data.lineBuffer.isEmpty()) {
extra = data.lineBuffer.remove(0).line;
}
}
textLine.line += extra;
}
}
if (data.filePlayList.isProcessingNeeded(
textLine.file, textLine.lineNumber, AbstractFileErrorHandler.NO_PARTS)) {
lineInFile++;
long useNumber =
meta.content.rowNumberByFile ? lineInFile : transform.getLinesWritten() + 1;
r =
TextFileInputUtils.convertLineToRow(
log,
textLine,
meta,
data.currentPassThruFieldsRow,
data.nrPassThruFields,
data.outputRowMeta,
data.convertRowMeta,
data.filename,
useNumber,
data.separator,
data.enclosure,
data.escapeCharacter,
data.dataErrorLineHandler,
meta.additionalOutputFields,
data.shortFilename,
data.path,
data.hidden,
data.lastModificationDateTime,
data.uriName,
data.rootUriName,
data.extension,
data.size);
if (r != null) {
if (log.isRowLevel()) {
log.logRowlevel("Found data row: " + data.outputRowMeta.getString(r));
}
putrow = true;
}
} else {
putrow = false;
}
}
}
}
if (putrow && r != null) {
// See if the previous values need to be repeated!
if (data.nr_repeats > 0) {
if (data.previousRow == null) { // First invocation...
data.previousRow = data.outputRowMeta.cloneRow(r);
} else {
for (int i = 0; i < meta.inputFields.length; i++) {
if (meta.inputFields[i].isRepeated()) {
if (r[i] == null) {
// if it is empty: take the previous value!
r[i] = data.previousRow[i];
} else {
// not empty: change the previous_row entry!
data.previousRow[i] = r[i];
}
}
}
}
}
if (log.isRowLevel()) {
log.logRowlevel("Putting row: " + data.outputRowMeta.getString(r));
}
transform.putRow(data.outputRowMeta, r);
if (transform.getLinesInput() >= meta.content.rowLimit && meta.content.rowLimit > 0) {
close();
return false;
}
}
if (transform.checkFeedback(transform.getLinesInput()) && log.isBasic()) {
log.logBasic("linenr " + transform.getLinesInput());
}
return retval;
}