in file/src/main/java/org/apache/pekko/stream/connectors/file/impl/DirectoryChangesSource.java [89:214]
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException {
if (!Files.exists(directoryPath))
throw new IllegalArgumentException("The path: '" + directoryPath + "' does not exist");
if (!Files.isDirectory(directoryPath))
throw new IllegalArgumentException("The path '" + directoryPath + "' is not a directory");
return new TimerGraphStageLogic(shape) {
private final Queue<T> buffer = new ArrayDeque<>();
private final WatchService service = directoryPath.getFileSystem().newWatchService();
private final WatchKey watchKey =
directoryPath.register(
service,
new WatchEvent.Kind<?>[] {ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW},
// this is com.sun internal, but the service is useless on OSX without it
SensitivityWatchEventModifier.HIGH);
{
setHandler(
out,
new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if (!buffer.isEmpty()) {
pushHead();
} else {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}
});
}
@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}
@Override
public void postStop() {
try {
if (watchKey.isValid()) watchKey.cancel();
service.close();
} catch (Exception ex) {
// Remove when #21168 is in a release
throw new RuntimeException(ex);
}
}
private void pushHead() {
final T head = buffer.poll();
if (head != null) {
push(out, head);
}
}
private void schedulePoll() {
scheduleOnce("poll", pollInterval);
}
private void doPoll() {
try {
for (WatchEvent<?> event : watchKey.pollEvents()) {
final WatchEvent.Kind<?> kind = event.kind();
if (OVERFLOW.equals(kind)) {
// overflow means that some file system change events may have been missed,
// that may be ok for some scenarios but to make sure it does not pass unnoticed we
// fail the stage
failStage(
new RuntimeException("Overflow from watch service: '" + directoryPath + "'"));
} else {
// if it's not an overflow it must be a Path event
@SuppressWarnings("unchecked")
final Path path = (Path) event.context();
final Path absolutePath = directoryPath.resolve(path);
final DirectoryChange change = kindToChange(kind);
buffer.add(combiner.apply(absolutePath, change));
if (buffer.size() > maxBufferSize) {
failStage(
new RuntimeException(
"Max event buffer size " + maxBufferSize + " reached for " + path));
}
}
}
} finally {
if (!watchKey.reset()) {
// directory no longer accessible
completeStage();
}
}
}
// convert from the parametrized API to our much nicer API enum
private DirectoryChange kindToChange(WatchEvent.Kind<?> kind) {
final DirectoryChange change;
if (kind.equals(ENTRY_CREATE)) {
change = DirectoryChange.Creation;
} else if (kind.equals(ENTRY_DELETE)) {
change = DirectoryChange.Deletion;
} else if (kind.equals(ENTRY_MODIFY)) {
change = DirectoryChange.Modification;
} else {
throw new RuntimeException(
"Unexpected kind of event gotten from watch service for path '"
+ directoryPath
+ "': "
+ kind);
}
return change;
}
};
}