in streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java [51:114]
public void run() {
LOGGER.info("WebHdfsPersistReaderTask: files to process");
for ( FileStatus fileStatus : reader.status ) {
LOGGER.info(" " + fileStatus.getPath().getName());
}
for ( FileStatus fileStatus : reader.status ) {
InputStream inputStream;
InputStreamReader inputStreamReader;
BufferedReader bufferedReader;
if ( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE;
if ( fileStatus.getPath().getName().endsWith(".gz")) {
compression = HdfsWriterConfiguration.Compression.GZIP;
}
LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), compression.toString());
try {
inputStream = reader.client.open(fileStatus.getPath());
if ( compression.equals(HdfsWriterConfiguration.Compression.GZIP)) {
inputStream = new GZIPInputStream(inputStream);
}
inputStreamReader = new InputStreamReader(inputStream, reader.hdfsConfiguration.getEncoding());
bufferedReader = new BufferedReader(inputStreamReader);
} catch (Exception ex) {
LOGGER.error("Exception Opening " + fileStatus.getPath(), ex.getMessage());
return;
}
String line = "";
do {
try {
line = bufferedReader.readLine();
if (StringUtils.isNotEmpty(line)) {
reader.countersCurrent.incrementAttempt();
StreamsDatum entry = reader.lineReaderUtil.processLine(line);
if ( entry != null ) {
reader.write(entry);
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
} else {
LOGGER.warn("processLine failed");
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
}
}
} catch (Exception ex) {
LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", ex);
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
}
}
while (StringUtils.isNotEmpty(line));
LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
try {
bufferedReader.close();
} catch (Exception ex) {
LOGGER.error("WebHdfsPersistReader Exception: {}", ex);
}
}
}
LOGGER.info("WebHdfsPersistReaderTask Finished");
Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS);
}