in core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java [133:169]
protected void populateBuffer() throws IOException {
if (currentBuffer == null) {
String file = _inputFiles.poll();
if (file == null) return;
Path inputPath = Paths.get(file);
currentBuffer =
new BufferedReader(
new InputStreamReader(
new FileInputStream(inputPath.toFile()),
StandardCharsets.UTF_8));
}
String line = null;
int linesRead = 0;
while (linesRead < BATCH_SIZE && (line = currentBuffer.readLine()) != null) {
if (StringUtils.isBlank(line)) continue;
if (line.startsWith("#")) continue;
// check whether this entry should be skipped?
// totalTasks could be at 0 if a subclass forgot to
// call this classes open()
if (totalTasks == 0 || linesRead % totalTasks == taskIndex) {
LOG.debug(
"Adding to buffer for spout {} -> line ({}) {}",
taskIndex,
linesRead,
line);
buffer.add(line.trim().getBytes(StandardCharsets.UTF_8));
}
linesRead++;
}
// finished the file?
if (line == null) {
currentBuffer.close();
currentBuffer = null;
}
}