in watchman/watcher/fsevents.cpp [141:297]
void FSEventsWatcher::fse_callback(
ConstFSEventStreamRef,
void* clientCallBackInfo,
size_t numEvents,
void* eventPaths,
const FSEventStreamEventFlags eventFlags[],
const FSEventStreamEventId eventIds[]) {
size_t i;
auto paths = reinterpret_cast<char**>(eventPaths);
auto stream = reinterpret_cast<FSEventsStream*>(clientCallBackInfo);
auto root = stream->root;
std::vector<watchman_fsevent> items;
auto watcher = stream->watcher;
stream->watcher->totalEventsSeen_.fetch_add(
numEvents, std::memory_order_relaxed);
if (stream->watcher->ringBuffer_) {
for (i = 0; i < numEvents; i++) {
uint32_t flags = eventFlags[i];
const char* path = paths[i];
stream->watcher->ringBuffer_->write(FSEventsLogEntry{flags, path});
}
}
if (!stream->lost_sync) {
// This is to facilitate testing via debug-fsevents-inject-drop.
if (stream->inject_drop) {
stream->lost_sync = true;
log_drop_event(root, false);
goto do_resync;
}
// Pre-scan to test whether we lost sync. The intent is to be able to skip
// processing the events from the point at which we lost sync, so we have
// to check this before we start allocating events for the consumer.
for (i = 0; i < numEvents; i++) {
if ((eventFlags[i] &
(kFSEventStreamEventFlagUserDropped |
kFSEventStreamEventFlagKernelDropped)) != 0) {
// We don't ever need to clear lost_sync as the code below will either
// set up a new stream instance with it cleared, or will recrawl and
// set up a whole new state for the recrawled instance.
stream->lost_sync = true;
log_drop_event(
root, eventFlags[i] & kFSEventStreamEventFlagKernelDropped);
if (watcher->attemptResyncOnDrop_) {
// fseventsd has a reliable journal so we can attempt to resync.
do_resync:
if (stream->event_id_wrapped) {
logf(
ERR,
"fsevents lost sync and the event_ids wrapped, so "
"we have no choice but to do a full recrawl\n");
// Allow the Dropped event to propagate and trigger a recrawl
goto propagate;
}
if (watcher->stream_.get() == stream) {
// We are the active stream for this watch which means that it
// is safe for us to proceed with changing watcher->stream.
// Attempt to set up a new stream to resync from the last-good
// event. If successful, that will replace the current stream.
// If we fail, then we allow the UserDropped event to propagate
// to the consumer thread which has existing logic to schedule
// a recrawl.
w_string failure_reason;
auto replacement = fse_stream_make(
root, watcher, stream->last_good, failure_reason);
if (!replacement) {
logf(
ERR,
"Failed to rebuild fsevent stream ({}) while trying to "
"resync, falling back to a regular recrawl\n",
failure_reason);
// Allow the UserDropped event to propagate and trigger a recrawl
goto propagate;
}
if (!FSEventStreamStart(replacement->stream)) {
logf(
ERR,
"FSEventStreamStart failed while trying to "
"resync, falling back to a regular recrawl\n");
// Allow the UserDropped event to propagate and trigger a recrawl
goto propagate;
}
logf(
ERR,
"Lost sync, so resync from last_good event {}\n",
stream->last_good);
// mark the replacement as the winner
std::swap(watcher->stream_, replacement);
// And we're done.
return;
}
}
break;
}
}
} else if (watcher->attemptResyncOnDrop_) {
// This stream has already lost sync and our policy is to resync
// for ourselves. This is most likely a spurious callback triggered
// after we'd taken action above. We just ignore further events
// on this particular stream and let the other stuff kick in.
return;
}
propagate:
items.reserve(numEvents);
for (i = 0; i < numEvents; i++) {
const char* path = paths[i];
if (eventFlags[i] & kFSEventStreamEventFlagHistoryDone) {
// The docs say to ignore this event; it's just a marker informing
// us that a resync completed. Take this opportunity to log how
// many events were replayed to catch up.
logf(
ERR,
"Historical resync completed at event id {} (caught "
"up on {} events)\n",
eventIds[i],
eventIds[i] - stream->since);
continue;
}
if (eventFlags[i] & kFSEventStreamEventFlagEventIdsWrapped) {
stream->event_id_wrapped = true;
}
uint32_t len = strlen(path);
while (path[len - 1] == '/') {
len--;
}
if (root->ignore.isIgnored(path, len)) {
continue;
}
items.emplace_back(w_string(path, len), eventFlags[i]);
if (!stream->lost_sync) {
stream->last_good = eventIds[i];
}
}
if (!items.empty()) {
auto wlock = watcher->items_.lock();
wlock->items.push_back(std::move(items));
watcher->fseCond_.notify_one();
}
}