public GraphStageLogic createLogic()

in file/src/main/java/org/apache/pekko/stream/connectors/file/impl/DirectoryChangesSource.java [89:214]


  public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException {
    if (!Files.exists(directoryPath))
      throw new IllegalArgumentException("The path: '" + directoryPath + "' does not exist");
    if (!Files.isDirectory(directoryPath))
      throw new IllegalArgumentException("The path '" + directoryPath + "' is not a directory");

    return new TimerGraphStageLogic(shape) {
      private final Queue<T> buffer = new ArrayDeque<>();
      private final WatchService service = directoryPath.getFileSystem().newWatchService();
      private final WatchKey watchKey =
          directoryPath.register(
              service,
              new WatchEvent.Kind<?>[] {ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW},
              // this is com.sun internal, but the service is useless on OSX without it
              SensitivityWatchEventModifier.HIGH);

      {
        setHandler(
            out,
            new AbstractOutHandler() {

              @Override
              public void onPull() throws Exception {
                if (!buffer.isEmpty()) {
                  pushHead();
                } else {
                  doPoll();
                  if (!buffer.isEmpty()) {
                    pushHead();
                  } else {
                    schedulePoll();
                  }
                }
              }
            });
      }

      @Override
      public void onTimer(Object timerKey) {
        if (!isClosed(out)) {
          doPoll();
          if (!buffer.isEmpty()) {
            pushHead();
          } else {
            schedulePoll();
          }
        }
      }

      @Override
      public void postStop() {
        try {
          if (watchKey.isValid()) watchKey.cancel();
          service.close();
        } catch (Exception ex) {
          // Remove when #21168 is in a release
          throw new RuntimeException(ex);
        }
      }

      private void pushHead() {
        final T head = buffer.poll();
        if (head != null) {
          push(out, head);
        }
      }

      private void schedulePoll() {
        scheduleOnce("poll", pollInterval);
      }

      private void doPoll() {
        try {
          for (WatchEvent<?> event : watchKey.pollEvents()) {
            final WatchEvent.Kind<?> kind = event.kind();

            if (OVERFLOW.equals(kind)) {
              // overflow means that some file system change events may have been missed,
              // that may be ok for some scenarios but to make sure it does not pass unnoticed we
              // fail the stage
              failStage(
                  new RuntimeException("Overflow from watch service: '" + directoryPath + "'"));

            } else {
              // if it's not an overflow it must be a Path event
              @SuppressWarnings("unchecked")
              final Path path = (Path) event.context();
              final Path absolutePath = directoryPath.resolve(path);
              final DirectoryChange change = kindToChange(kind);

              buffer.add(combiner.apply(absolutePath, change));
              if (buffer.size() > maxBufferSize) {
                failStage(
                    new RuntimeException(
                        "Max event buffer size " + maxBufferSize + " reached for " + path));
              }
            }
          }
        } finally {
          if (!watchKey.reset()) {
            // directory no longer accessible
            completeStage();
          }
        }
      }

      // convert from the parametrized API to our much nicer API enum
      private DirectoryChange kindToChange(WatchEvent.Kind<?> kind) {
        final DirectoryChange change;
        if (kind.equals(ENTRY_CREATE)) {
          change = DirectoryChange.Creation;
        } else if (kind.equals(ENTRY_DELETE)) {
          change = DirectoryChange.Deletion;
        } else if (kind.equals(ENTRY_MODIFY)) {
          change = DirectoryChange.Modification;
        } else {
          throw new RuntimeException(
              "Unexpected kind of event gotten from watch service for path '"
                  + directoryPath
                  + "': "
                  + kind);
        }
        return change;
      }
    };
  }