in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java [50:85]
public void run() {
try (WatchService watcher = FileSystems.getDefault().newWatchService()) {
LOG.info("Starting watching path: " + directoryPath);
Path realDirectoryPath = Paths.get(directoryPath).toRealPath();
LOG.info("Path is resolved to real path: " + realDirectoryPath);
realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
onWatchStarted(realDirectoryPath);
while (true) {
LOG.debug("Taking watch key");
WatchKey watchKey = watcher.take();
LOG.debug("Watch key arrived");
for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
LOG.debug("Watch event count: " + watchEvent.count());
if (watchEvent.kind() == OVERFLOW) {
LOG.error("Filesystem events may have been lost or discarded");
Thread.yield();
} else if (watchEvent.kind() == ENTRY_CREATE) {
onFileOrDirectoryCreated((Path) watchEvent.context());
} else if (watchEvent.kind() == ENTRY_DELETE) {
onFileOrDirectoryDeleted((Path) watchEvent.context());
} else if (watchEvent.kind() == ENTRY_MODIFY) {
onFileOrDirectoryModified((Path) watchEvent.context());
} else {
throw new IllegalStateException("Invalid event kind: " + watchEvent.kind());
}
}
watchKey.reset();
}
} catch (InterruptedException e) {
LOG.info("Filesystem watcher interrupted");
} catch (Exception e) {
LOG.error("Filesystem watcher received exception and stopped: " + e);
throw new RuntimeException(e);
}
}