in src/com/facebook/buck/io/watchman/WatchmanWatcher.java [264:444]
private void postEvents(
BuckEventBus buckEventBus,
FreshInstanceAction freshInstanceAction,
AbsPath cellPath,
WatchmanClient client,
WatchmanQuery query,
WatchmanCursor cursor,
AtomicBoolean filesHaveChanged,
SimplePerfEvent.Scope perfEvent)
throws IOException, InterruptedException {
try {
Optional<? extends Map<String, ? extends Object>> queryResponse;
try (SimplePerfEvent.Scope ignored = SimplePerfEvent.scope(buckEventBus, "query")) {
queryResponse =
client.queryWithTimeout(
TimeUnit.MILLISECONDS.toNanos(timeoutMillis), query.toList(cursor.get()).toArray());
}
try (SimplePerfEvent.Scope ignored =
SimplePerfEvent.scope(buckEventBus, "process_response")) {
if (!queryResponse.isPresent()) {
LOG.warn(
"Could not get response from Watchman for query %s within %d ms",
query, timeoutMillis);
postWatchEvent(
buckEventBus,
ImmutableWatchmanOverflowEvent.of(
cellPath,
"Timed out after "
+ TimeUnit.MILLISECONDS.toSeconds(timeoutMillis)
+ " sec waiting for watchman query."));
filesHaveChanged.set(true);
return;
}
Map<String, ? extends Object> response = queryResponse.get();
String error = (String) response.get("error");
if (error != null) {
// This message is not de-duplicated via WatchmanDiagnostic.
WatchmanWatcherException e = new WatchmanWatcherException(error);
LOG.debug(e, "Error in Watchman output. Posting an overflow event to flush the caches");
postWatchEvent(
buckEventBus,
ImmutableWatchmanOverflowEvent.of(
cellPath, "Watchman error occurred: " + e.getMessage()));
throw e;
}
if (cursor.get().startsWith("c:")) {
// Update the clockId
String newCursor =
Optional.ofNullable((String) response.get("clock"))
.orElse(WatchmanFactory.NULL_CLOCK);
LOG.debug("Updating Watchman Cursor from %s to %s", cursor.get(), newCursor);
cursor.set(newCursor);
}
String warning = (String) response.get("warning");
if (warning != null) {
buckEventBus.post(
new WatchmanDiagnosticEvent(
WatchmanDiagnostic.of(WatchmanDiagnostic.Level.WARNING, warning)));
}
Boolean isFreshInstance = (Boolean) response.get("is_fresh_instance");
if (isFreshInstance != null && isFreshInstance) {
LOG.debug(
"Watchman indicated a fresh instance (fresh instance action %s)",
freshInstanceAction);
switch (freshInstanceAction) {
case NONE:
break;
case POST_OVERFLOW_EVENT:
postWatchEvent(
buckEventBus,
ImmutableWatchmanOverflowEvent.of(
cellPath, "Watchman has been initialized recently."));
break;
}
filesHaveChanged.set(true);
return;
}
List<Map<String, Object>> files = (List<Map<String, Object>>) response.get("files");
if (files == null) {
if (freshInstanceAction == FreshInstanceAction.NONE) {
filesHaveChanged.set(true);
}
return;
}
LOG.debug("Watchman indicated %d changes", files.size());
if (files.size() > OVERFLOW_THRESHOLD) {
LOG.warn(
"Posting overflow event: too many files changed: %d > %d",
files.size(), OVERFLOW_THRESHOLD);
postWatchEvent(
buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, "Too many files changed."));
filesHaveChanged.set(true);
return;
}
if (files.size() < TRACE_CHANGES_THRESHOLD) {
perfEvent.appendFinishedInfo("files", files);
} else {
perfEvent.appendFinishedInfo("files_sample", files.subList(0, TRACE_CHANGES_THRESHOLD));
}
FileSystem fileSystem = cellPath.getFileSystem();
List<WatchmanMultiplePathEvent.Change> changes = new ArrayList<>(files.size());
for (Map<String, Object> file : files) {
String fileName = (String) file.get("name");
if (fileName == null) {
LOG.warn("Filename missing from watchman file response %s", file);
postWatchEvent(
buckEventBus,
ImmutableWatchmanOverflowEvent.of(
cellPath, "Filename missing from watchman response."));
filesHaveChanged.set(true);
return;
}
Boolean fileNew = (Boolean) file.get("new");
WatchmanEvent.Kind kind = WatchmanEvent.Kind.MODIFY;
if (fileNew != null && fileNew) {
kind = WatchmanEvent.Kind.CREATE;
}
Boolean fileExists = (Boolean) file.get("exists");
if (fileExists != null && !fileExists) {
kind = WatchmanEvent.Kind.DELETE;
}
// Following legacy behavior, everything we get from Watchman is interpreted as file
// changes unless explicitly specified with `type` field
WatchmanEvent.Type type = Type.FILE;
String stype = (String) file.get("type");
if (stype != null) {
switch (stype) {
case "d":
type = Type.DIRECTORY;
break;
case "l":
type = Type.SYMLINK;
break;
}
}
RelPath filePath = RelPath.of(fileSystem.getPath(fileName));
changes.add(ImmutableChange.of(type, filePath.getPath(), kind));
if (type != WatchmanEvent.Type.DIRECTORY) {
// WatchmanPathEvent is sent for everything but directories - this is legacy
// behavior and we want to keep it.
// TODO(buck_team): switch everything to use WatchmanMultiplePathEvent and retire
// WatchmanPathEvent
postWatchEvent(buckEventBus, ImmutableWatchmanPathEvent.of(cellPath, kind, filePath));
}
}
if (!changes.isEmpty()) {
postWatchEvent(buckEventBus, ImmutableWatchmanMultiplePathEvent.of(cellPath, changes));
}
if (!files.isEmpty() || freshInstanceAction == FreshInstanceAction.NONE) {
filesHaveChanged.set(true);
}
}
} catch (InterruptedException e) {
String message = "The communication with watchman daemon has been interrupted.";
LOG.warn(e, message);
// Events may have been lost, signal overflow.
postWatchEvent(buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, message));
Threads.interruptCurrentThread();
throw e;
} catch (IOException e) {
String message =
"There was an error while communicating with the watchman daemon: " + e.getMessage();
LOG.error(e, message);
// Events may have been lost, signal overflow.
postWatchEvent(buckEventBus, ImmutableWatchmanOverflowEvent.of(cellPath, message));
throw e;
}
}