public boolean processRow()

in plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/fileinput/TextFileInput.java [1040:1415]


  public boolean processRow() throws HopException {
    Object[] r = null;
    boolean retval = true;
    boolean putrow = false;

    if (first) { // we just got started

      first = false;

      data.outputRowMeta = new RowMeta();
      IRowMeta[] infoTransform = null;

      if (meta.isAcceptingFilenames()) {
        // Read the files from the specified input stream...
        //
        data.getFiles().getFiles().clear();

        int idx = -1;
        data.rowSet = findInputRowSet(meta.getAcceptingTransformName());

        Object[] fileRow = getRowFrom(data.rowSet);
        while (fileRow != null) {
          IRowMeta prevInfoFields = data.rowSet.getRowMeta();
          if (idx < 0) {
            if (meta.isPassingThruFields()) {
              data.passThruFields = new HashMap<>();
              infoTransform = new IRowMeta[] {prevInfoFields};
              data.nrPassThruFields = prevInfoFields.size();
            }
            idx = prevInfoFields.indexOfValue(meta.getAcceptingField());
            if (idx < 0) {
              logError(
                  BaseMessages.getString(
                      PKG,
                      "TextFileInput.Log.Error.UnableToFindFilenameField",
                      meta.getAcceptingField()));
              setErrors(getErrors() + 1);
              stopAll();
              return false;
            }
          }
          String fileValue = prevInfoFields.getString(fileRow, idx);
          try {
            FileObject fileObject = HopVfs.getFileObject(fileValue, variables);
            data.getFiles().addFile(fileObject);
            if (meta.isPassingThruFields()) {
              data.passThruFields.put(fileObject, fileRow);
            }
          } catch (HopFileException e) {
            logError(
                BaseMessages.getString(
                    PKG, "TextFileInput.Log.Error.UnableToCreateFileObject", fileValue),
                e);
          }

          // Grab another row
          fileRow = getRowFrom(data.rowSet);
        }

        if (data.getFiles().nrOfFiles() == 0) {
          if (isDetailed()) {
            logDetailed(BaseMessages.getString(PKG, "TextFileInput.Log.Error.NoFilesSpecified"));
          }
          setOutputDone();
          return false;
        }
      }

      // // get the metadata populated. Simple and easy.
      meta.getFields(
          data.outputRowMeta, getTransformName(), infoTransform, null, this, metadataProvider);
      // Create convert meta-data objects that will contain Date & Number formatters
      //
      data.convertRowMeta = data.outputRowMeta.cloneToType(IValueMeta.TYPE_STRING);

      handleMissingFiles();

      // Open the first file & read the required rows in the buffer, stop
      // if it fails and not set to skip bad files...
      if (!openNextFile()) {
        if (failAfterBadFile(null)) {
          closeLastFile();
          setOutputDone();
          return false;
        }
      }

      // Count the number of repeat fields...
      for (int i = 0; i < meta.getInputFields().length; i++) {
        if (meta.getInputFields()[i].isRepeated()) {
          data.nr_repeats++;
        }
      }
    } else {
      if (!data.doneReading) {
        int repeats = 1;
        if (meta.isLineWrapped()) {
          repeats = meta.getNrWraps() > 0 ? meta.getNrWraps() : repeats;
        }

        if (!data.doneWithHeader && data.headerLinesRead == 0) {
          // We are just starting to read header lines, read them all
          repeats += meta.getNrHeaderLines() + 1;
        }

        // Read a number of lines...
        for (int i = 0; i < repeats && !data.doneReading; i++) {
          if (!tryToReadLine(true)) {
            repeats++;
          }
        }
      }
    }

    /*
     * If the buffer is empty: open the next file. (if nothing in there, open the next, etc.)
     */
    while (data.lineBuffer.isEmpty()) {
      if (!openNextFile()) {
        // Open fails: done processing unless set to skip bad files
        if (failAfterBadFile(null)) {
          closeLastFile();
          setOutputDone(); // signal end to receiver(s)
          return false;
        } // else will continue until can open
      }
    }

    /*
     * Take the first line available in the buffer & remove the line from the buffer
     */
    TextFileLine textLine = data.lineBuffer.get(0);
    incrementLinesInput();
    lineNumberInFile++;

    data.lineBuffer.remove(0);

    if (meta.isLayoutPaged()) {
      /*
       * Different rules apply: on each page: a header a number of data lines a footer
       */
      if (!data.doneWithHeader && data.pageLinesRead == 0) {
        // We are reading header lines
        if (isRowLevel()) {
          logRowlevel("P-HEADER (" + data.headerLinesRead + ") : " + textLine.line);
        }
        data.headerLinesRead++;
        if (data.headerLinesRead >= meta.getNrHeaderLines()) {
          data.doneWithHeader = true;
        }
      } else {
        // data lines or footer on a page

        if (data.pageLinesRead < meta.getNrLinesPerPage()) {
          // See if we are dealing with wrapped lines:
          if (meta.isLineWrapped()) {
            for (int i = 0; i < meta.getNrWraps(); i++) {
              String extra = "";
              if (!data.lineBuffer.isEmpty()) {
                extra = data.lineBuffer.get(0).line;
                data.lineBuffer.remove(0);
              }
              textLine.line += extra;
            }
          }

          if (isRowLevel()) {
            logRowlevel("P-DATA: " + textLine.line);
          }
          // Read a normal line on a page of data.
          data.pageLinesRead++;
          data.lineInFile++;
          long useNumber = meta.isRowNumberByFile() ? data.lineInFile : getLinesWritten() + 1;
          r =
              convertLineToRow(
                  getLogChannel(),
                  textLine,
                  meta,
                  data.currentPassThruFieldsRow,
                  data.nrPassThruFields,
                  data.outputRowMeta,
                  data.convertRowMeta,
                  data.filename,
                  useNumber,
                  data.separator,
                  data.enclosure,
                  data.escapeCharacter,
                  data.dataErrorLineHandler,
                  data.addShortFilename,
                  data.addExtension,
                  data.addPath,
                  data.addSize,
                  data.addIsHidden,
                  data.addLastModificationDate,
                  data.addUri,
                  data.addRootUri,
                  data.shortFilename,
                  data.path,
                  data.hidden,
                  data.lastModificationDateTime,
                  data.uriName,
                  data.rootUriName,
                  data.extension,
                  data.size);
          if (r != null) {
            putrow = true;
          }

          // Possible fix for bug - paged layout header and line count off by 1
          // We need to reset these BEFORE the next header line is read, so that it
          // is treated as a header ... obviously, only if there is no footer, and we are
          // done reading data.
          if (!meta.hasFooter() && (data.pageLinesRead == meta.getNrLinesPerPage())) {
            /*
             * OK, we are done reading the footer lines, start again on 'next page' with the header
             */
            data.doneWithHeader = false;
            data.headerLinesRead = 0;
            data.pageLinesRead = 0;
            data.footerLinesRead = 0;
            if (isRowLevel()) {
              logRowlevel("RESTART PAGE");
            }
          }
        } else {
          // done reading the data lines, skip the footer lines

          if (meta.hasFooter() && data.footerLinesRead < meta.getNrFooterLines()) {
            if (isRowLevel()) {
              logRowlevel("P-FOOTER: " + textLine.line);
            }
            data.footerLinesRead++;
          }

          if (!meta.hasFooter() || data.footerLinesRead >= meta.getNrFooterLines()) {
            /*
             * OK, we are done reading the footer lines, start again on 'next page' with the header
             */
            data.doneWithHeader = false;
            data.headerLinesRead = 0;
            data.pageLinesRead = 0;
            data.footerLinesRead = 0;
            if (isRowLevel()) {
              logRowlevel("RESTART PAGE");
            }
          }
        }
      }
    } else {
      // A normal data line, can also be a header or a footer line

      if (!data.doneWithHeader) { // We are reading header lines

        data.headerLinesRead++;
        if (data.headerLinesRead >= meta.getNrHeaderLines()) {
          data.doneWithHeader = true;
        }
      } else {
        /*
         * IF we are done reading and we have a footer AND the number of lines in the buffer is smaller then the number
         * of footer lines THEN we can remove the remaining rows from the buffer: they are all footer rows.
         */
        if (data.doneReading
            && meta.hasFooter()
            && data.lineBuffer.size() < meta.getNrFooterLines()) {
          data.lineBuffer.clear();
        } else {
          // Not yet a footer line: it's a normal data line.

          // See if we are dealing with wrapped lines:
          if (meta.isLineWrapped()) {
            for (int i = 0; i < meta.getNrWraps(); i++) {
              String extra = "";
              if (!data.lineBuffer.isEmpty()) {
                extra = data.lineBuffer.get(0).line;
                data.lineBuffer.remove(0);
              } else {
                tryToReadLine(true);
                if (!data.lineBuffer.isEmpty()) {
                  extra = data.lineBuffer.remove(0).line;
                }
              }
              textLine.line += extra;
            }
          }
          if (data.filePlayList.isProcessingNeeded(
              textLine.file, textLine.lineNumber, AbstractFileErrorHandler.NO_PARTS)) {
            data.lineInFile++;
            long useNumber = meta.isRowNumberByFile() ? data.lineInFile : getLinesWritten() + 1;
            r =
                convertLineToRow(
                    getLogChannel(),
                    textLine,
                    meta,
                    data.currentPassThruFieldsRow,
                    data.nrPassThruFields,
                    data.outputRowMeta,
                    data.convertRowMeta,
                    data.filename,
                    useNumber,
                    data.separator,
                    data.enclosure,
                    data.escapeCharacter,
                    data.dataErrorLineHandler,
                    data.addShortFilename,
                    data.addExtension,
                    data.addPath,
                    data.addSize,
                    data.addIsHidden,
                    data.addLastModificationDate,
                    data.addUri,
                    data.addRootUri,
                    data.shortFilename,
                    data.path,
                    data.hidden,
                    data.lastModificationDateTime,
                    data.uriName,
                    data.rootUriName,
                    data.extension,
                    data.size);
            if (r != null) {
              if (isRowLevel()) {
                logRowlevel("Found data row: " + data.outputRowMeta.getString(r));
              }
              putrow = true;
            }
          } else {
            putrow = false;
          }
        }
      }
    }

    if (putrow && r != null) {
      // See if the previous values need to be repeated!
      if (data.nr_repeats > 0) {
        if (data.previousRow == null) { // First invocation...

          data.previousRow = data.outputRowMeta.cloneRow(r);
        } else {
          for (int i = 0; i < meta.getInputFields().length; i++) {
            if (meta.getInputFields()[i].isRepeated()) {
              if (r[i] == null) {
                // if it is empty: take the previous value!

                r[i] = data.previousRow[i];
              } else {
                // not empty: change the previous_row entry!

                data.previousRow[i] = r[i];
              }
            }
          }
        }
      }

      if (isRowLevel()) {
        logRowlevel("Putting row: " + data.outputRowMeta.getString(r));
      }
      putRow(data.outputRowMeta, r);

      if (getLinesInput() >= meta.getRowLimit() && meta.getRowLimit() > 0) {
        closeLastFile();
        setOutputDone(); // signal end to receiver(s)
        return false;
      }
    }

    if (checkFeedback(getLinesInput())) {
      if (isBasic()) {
        logBasic("linenr " + getLinesInput());
      }
    }

    return retval;
  }