private void postEvents()

in src/com/facebook/buck/io/watchman/WatchmanWatcher.java [264:444]


  private void postEvents(
      BuckEventBus buckEventBus,
      FreshInstanceAction freshInstanceAction,
      AbsPath cellPath,
      WatchmanClient client,
      WatchmanQuery query,
      WatchmanCursor cursor,
      AtomicBoolean filesHaveChanged,
      SimplePerfEvent.Scope perfEvent)
      throws IOException, InterruptedException {
    try {
      Optional<? extends Map<String, ? extends Object>> queryResponse;
      try (SimplePerfEvent.Scope ignored = SimplePerfEvent.scope(buckEventBus, "query")) {
        queryResponse =
            client.queryWithTimeout(
                TimeUnit.MILLISECONDS.toNanos(timeoutMillis), query.toList(cursor.get()).toArray());
      }

      try (SimplePerfEvent.Scope ignored =
          SimplePerfEvent.scope(buckEventBus, "process_response")) {
        if (!queryResponse.isPresent()) {
          LOG.warn(
              "Could not get response from Watchman for query %s within %d ms",
              query, timeoutMillis);
          postWatchEvent(
              buckEventBus,
              ImmutableWatchmanOverflowEvent.of(
                  cellPath,
                  "Timed out after "
                      + TimeUnit.MILLISECONDS.toSeconds(timeoutMillis)
                      + " sec waiting for watchman query."));
          filesHaveChanged.set(true);
          return;
        }

        Map<String, ? extends Object> response = queryResponse.get();
        String error = (String) response.get("error");
        if (error != null) {
          // This message is not de-duplicated via WatchmanDiagnostic.
          WatchmanWatcherException e = new WatchmanWatcherException(error);
          LOG.debug(e, "Error in Watchman output. Posting an overflow event to flush the caches");
          postWatchEvent(
              buckEventBus,
              ImmutableWatchmanOverflowEvent.of(
                  cellPath, "Watchman error occurred: " + e.getMessage()));
          throw e;
        }

        if (cursor.get().startsWith("c:")) {
          // Update the clockId
          String newCursor =
              Optional.ofNullable((String) response.get("clock"))
                  .orElse(WatchmanFactory.NULL_CLOCK);
          LOG.debug("Updating Watchman Cursor from %s to %s", cursor.get(), newCursor);
          cursor.set(newCursor);
        }

        String warning = (String) response.get("warning");
        if (warning != null) {
          buckEventBus.post(
              new WatchmanDiagnosticEvent(
                  WatchmanDiagnostic.of(WatchmanDiagnostic.Level.WARNING, warning)));
        }

        Boolean isFreshInstance = (Boolean) response.get("is_fresh_instance");
        if (isFreshInstance != null && isFreshInstance) {
          LOG.debug(
              "Watchman indicated a fresh instance (fresh instance action %s)",
              freshInstanceAction);
          switch (freshInstanceAction) {
            case NONE:
              break;
            case POST_OVERFLOW_EVENT:
              postWatchEvent(
                  buckEventBus,
                  ImmutableWatchmanOverflowEvent.of(
                      cellPath, "Watchman has been initialized recently."));
              break;
          }
          filesHaveChanged.set(true);
          return;
        }

        List<Map<String, Object>> files = (List<Map<String, Object>>) response.get("files");
        if (files == null) {
          if (freshInstanceAction == FreshInstanceAction.NONE) {
            filesHaveChanged.set(true);
          }
          return;
        }
        LOG.debug("Watchman indicated %d changes", files.size());
        if (files.size() > OVERFLOW_THRESHOLD) {
          LOG.warn(
              "Posting overflow event: too many files changed: %d > %d",
              files.size(), OVERFLOW_THRESHOLD);
          postWatchEvent(
              buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, "Too many files changed."));
          filesHaveChanged.set(true);
          return;
        }
        if (files.size() < TRACE_CHANGES_THRESHOLD) {
          perfEvent.appendFinishedInfo("files", files);
        } else {
          perfEvent.appendFinishedInfo("files_sample", files.subList(0, TRACE_CHANGES_THRESHOLD));
        }

        FileSystem fileSystem = cellPath.getFileSystem();
        List<WatchmanMultiplePathEvent.Change> changes = new ArrayList<>(files.size());
        for (Map<String, Object> file : files) {
          String fileName = (String) file.get("name");
          if (fileName == null) {
            LOG.warn("Filename missing from watchman file response %s", file);
            postWatchEvent(
                buckEventBus,
                ImmutableWatchmanOverflowEvent.of(
                    cellPath, "Filename missing from watchman response."));
            filesHaveChanged.set(true);
            return;
          }
          Boolean fileNew = (Boolean) file.get("new");
          WatchmanEvent.Kind kind = WatchmanEvent.Kind.MODIFY;
          if (fileNew != null && fileNew) {
            kind = WatchmanEvent.Kind.CREATE;
          }
          Boolean fileExists = (Boolean) file.get("exists");
          if (fileExists != null && !fileExists) {
            kind = WatchmanEvent.Kind.DELETE;
          }

          // Following legacy behavior, everything we get from Watchman is interpreted as file
          // changes unless explicitly specified with `type` field
          WatchmanEvent.Type type = Type.FILE;
          String stype = (String) file.get("type");
          if (stype != null) {
            switch (stype) {
              case "d":
                type = Type.DIRECTORY;
                break;
              case "l":
                type = Type.SYMLINK;
                break;
            }
          }

          RelPath filePath = RelPath.of(fileSystem.getPath(fileName));

          changes.add(ImmutableChange.of(type, filePath.getPath(), kind));

          if (type != WatchmanEvent.Type.DIRECTORY) {
            // WatchmanPathEvent is sent for everything but directories - this is legacy
            // behavior and we want to keep it.
            // TODO(buck_team): switch everything to use WatchmanMultiplePathEvent and retire
            // WatchmanPathEvent
            postWatchEvent(buckEventBus, ImmutableWatchmanPathEvent.of(cellPath, kind, filePath));
          }
        }

        if (!changes.isEmpty()) {
          postWatchEvent(buckEventBus, ImmutableWatchmanMultiplePathEvent.of(cellPath, changes));
        }

        if (!files.isEmpty() || freshInstanceAction == FreshInstanceAction.NONE) {
          filesHaveChanged.set(true);
        }
      }
    } catch (InterruptedException e) {
      String message = "The communication with watchman daemon has been interrupted.";
      LOG.warn(e, message);
      // Events may have been lost, signal overflow.
      postWatchEvent(buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, message));
      Threads.interruptCurrentThread();
      throw e;
    } catch (IOException e) {
      String message =
          "There was an error while communicating with the watchman daemon: " + e.getMessage();
      LOG.error(e, message);
      // Events may have been lost, signal overflow.
      postWatchEvent(buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, message));
      throw e;
    }
  }