public void run()

in streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java [51:114]


  public void run() {

    LOGGER.info("WebHdfsPersistReaderTask: files to process");

    for ( FileStatus fileStatus : reader.status ) {
      LOGGER.info("    " + fileStatus.getPath().getName());
    }

    for ( FileStatus fileStatus : reader.status ) {
      InputStream inputStream;
      InputStreamReader inputStreamReader;
      BufferedReader bufferedReader;
      if ( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
        HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE;
        if ( fileStatus.getPath().getName().endsWith(".gz")) {
          compression = HdfsWriterConfiguration.Compression.GZIP;
        }
        LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), compression.toString());
        try {
          inputStream = reader.client.open(fileStatus.getPath());
          if ( compression.equals(HdfsWriterConfiguration.Compression.GZIP)) {
            inputStream = new GZIPInputStream(inputStream);
          }
          inputStreamReader = new InputStreamReader(inputStream, reader.hdfsConfiguration.getEncoding());
          bufferedReader = new BufferedReader(inputStreamReader);
        } catch (Exception ex) {
          LOGGER.error("Exception Opening " + fileStatus.getPath(), ex.getMessage());
          return;
        }

        String line = "";
        do {
          try {
            line = bufferedReader.readLine();
            if (StringUtils.isNotEmpty(line)) {
              reader.countersCurrent.incrementAttempt();
              StreamsDatum entry = reader.lineReaderUtil.processLine(line);
              if ( entry != null ) {
                reader.write(entry);
                reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
              } else {
                LOGGER.warn("processLine failed");
                reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
              }
            }
          } catch (Exception ex) {
            LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", ex);
            reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
          }
        }
        while (StringUtils.isNotEmpty(line));
        LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
        try {
          bufferedReader.close();
        } catch (Exception ex) {
          LOGGER.error("WebHdfsPersistReader Exception: {}", ex);
        }
      }
    }

    LOGGER.info("WebHdfsPersistReaderTask Finished");

    Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS);
  }