void FSEventsWatcher::fse_callback()

in watchman/watcher/fsevents.cpp [141:297]


void FSEventsWatcher::fse_callback(
    ConstFSEventStreamRef,
    void* clientCallBackInfo,
    size_t numEvents,
    void* eventPaths,
    const FSEventStreamEventFlags eventFlags[],
    const FSEventStreamEventId eventIds[]) {
  size_t i;
  auto paths = reinterpret_cast<char**>(eventPaths);
  auto stream = reinterpret_cast<FSEventsStream*>(clientCallBackInfo);
  auto root = stream->root;
  std::vector<watchman_fsevent> items;
  auto watcher = stream->watcher;

  stream->watcher->totalEventsSeen_.fetch_add(
      numEvents, std::memory_order_relaxed);
  if (stream->watcher->ringBuffer_) {
    for (i = 0; i < numEvents; i++) {
      uint32_t flags = eventFlags[i];
      const char* path = paths[i];
      stream->watcher->ringBuffer_->write(FSEventsLogEntry{flags, path});
    }
  }

  if (!stream->lost_sync) {
    // This is to facilitate testing via debug-fsevents-inject-drop.
    if (stream->inject_drop) {
      stream->lost_sync = true;
      log_drop_event(root, false);
      goto do_resync;
    }

    // Pre-scan to test whether we lost sync.  The intent is to be able to skip
    // processing the events from the point at which we lost sync, so we have
    // to check this before we start allocating events for the consumer.
    for (i = 0; i < numEvents; i++) {
      if ((eventFlags[i] &
           (kFSEventStreamEventFlagUserDropped |
            kFSEventStreamEventFlagKernelDropped)) != 0) {
        // We don't ever need to clear lost_sync as the code below will either
        // set up a new stream instance with it cleared, or will recrawl and
        // set up a whole new state for the recrawled instance.
        stream->lost_sync = true;

        log_drop_event(
            root, eventFlags[i] & kFSEventStreamEventFlagKernelDropped);

        if (watcher->attemptResyncOnDrop_) {
        // fseventsd has a reliable journal so we can attempt to resync.
        do_resync:
          if (stream->event_id_wrapped) {
            logf(
                ERR,
                "fsevents lost sync and the event_ids wrapped, so "
                "we have no choice but to do a full recrawl\n");
            // Allow the Dropped event to propagate and trigger a recrawl
            goto propagate;
          }

          if (watcher->stream_.get() == stream) {
            // We are the active stream for this watch which means that it
            // is safe for us to proceed with changing watcher->stream.
            // Attempt to set up a new stream to resync from the last-good
            // event.  If successful, that will replace the current stream.
            // If we fail, then we allow the UserDropped event to propagate
            // to the consumer thread which has existing logic to schedule
            // a recrawl.
            w_string failure_reason;
            auto replacement = fse_stream_make(
                root, watcher, stream->last_good, failure_reason);

            if (!replacement) {
              logf(
                  ERR,
                  "Failed to rebuild fsevent stream ({}) while trying to "
                  "resync, falling back to a regular recrawl\n",
                  failure_reason);
              // Allow the UserDropped event to propagate and trigger a recrawl
              goto propagate;
            }

            if (!FSEventStreamStart(replacement->stream)) {
              logf(
                  ERR,
                  "FSEventStreamStart failed while trying to "
                  "resync, falling back to a regular recrawl\n");
              // Allow the UserDropped event to propagate and trigger a recrawl
              goto propagate;
            }

            logf(
                ERR,
                "Lost sync, so resync from last_good event {}\n",
                stream->last_good);

            // mark the replacement as the winner
            std::swap(watcher->stream_, replacement);

            // And we're done.
            return;
          }
        }
        break;
      }
    }
  } else if (watcher->attemptResyncOnDrop_) {
    // This stream has already lost sync and our policy is to resync
    // for ourselves.  This is most likely a spurious callback triggered
    // after we'd taken action above.  We just ignore further events
    // on this particular stream and let the other stuff kick in.
    return;
  }

propagate:

  items.reserve(numEvents);
  for (i = 0; i < numEvents; i++) {
    const char* path = paths[i];

    if (eventFlags[i] & kFSEventStreamEventFlagHistoryDone) {
      // The docs say to ignore this event; it's just a marker informing
      // us that a resync completed.  Take this opportunity to log how
      // many events were replayed to catch up.
      logf(
          ERR,
          "Historical resync completed at event id {} (caught "
          "up on {} events)\n",
          eventIds[i],
          eventIds[i] - stream->since);
      continue;
    }

    if (eventFlags[i] & kFSEventStreamEventFlagEventIdsWrapped) {
      stream->event_id_wrapped = true;
    }

    uint32_t len = strlen(path);
    while (path[len - 1] == '/') {
      len--;
    }

    if (root->ignore.isIgnored(path, len)) {
      continue;
    }

    items.emplace_back(w_string(path, len), eventFlags[i]);
    if (!stream->lost_sync) {
      stream->last_good = eventIds[i];
    }
  }

  if (!items.empty()) {
    auto wlock = watcher->items_.lock();
    wlock->items.push_back(std::move(items));
    watcher->fseCond_.notify_one();
  }
}