in pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java [70:104]
public void run() {
while (true) {
if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) {
try {
final File directory = new File(inputDir);
final Set<File> listing = performListing(directory, fileFilterRef.get(), recurseDirs);
if (listing != null && !listing.isEmpty()) {
// Remove any files that have been or are currently being processed.
listing.removeAll(inProcess);
if (!keepOriginal) {
listing.removeAll(recentlyProcessed);
}
for (File f: listing) {
if (!workQueue.contains(f)) {
workQueue.offer(f);
}
}
queueLastUpdated.set(System.currentTimeMillis());
}
} finally {
listingLock.unlock();
}
}
try {
sleep(pollingInterval - 1);
} catch (InterruptedException e) {
// Just ignore
}
}
}