in src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java [248:644]
public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
Preconditions.checkState(actionContextProvider == null, "actionContextProvider must be null");
Preconditions.checkState(actionInputFetcher == null, "actionInputFetcher must be null");
Preconditions.checkState(remoteOptions == null, "remoteOptions must be null");
RemoteOptions remoteOptions = env.getOptions().getOptions(RemoteOptions.class);
if (remoteOptions == null) {
// Quit if no supported command is being used. See getCommandOptions for details.
return;
}
this.remoteOptions = remoteOptions;
AuthAndTLSOptions authAndTlsOptions = env.getOptions().getOptions(AuthAndTLSOptions.class);
DigestHashFunction hashFn = env.getRuntime().getFileSystem().getDigestFunction();
DigestUtil digestUtil = new DigestUtil(env.getSyscallCache(), hashFn);
boolean verboseFailures = false;
ExecutionOptions executionOptions = env.getOptions().getOptions(ExecutionOptions.class);
if (executionOptions != null) {
verboseFailures = executionOptions.verboseFailures;
}
boolean enableDiskCache = RemoteCacheClientFactory.isDiskCache(remoteOptions);
boolean enableHttpCache = RemoteCacheClientFactory.isHttpCache(remoteOptions);
boolean enableRemoteExecution = shouldEnableRemoteExecution(remoteOptions);
// If --remote_cache is empty but --remote_executor is not, endpoint for cache should be the one
// for execution.
if (enableRemoteExecution && Strings.isNullOrEmpty(remoteOptions.remoteCache)) {
remoteOptions.remoteCache = remoteOptions.remoteExecutor;
}
boolean enableGrpcCache = GrpcCacheClient.isRemoteCacheOptions(remoteOptions);
boolean enableRemoteDownloader = shouldEnableRemoteDownloader(remoteOptions);
if (enableRemoteDownloader && !enableGrpcCache) {
throw createOptionsExitException(
"The remote downloader can only be used in combination with gRPC caching",
FailureDetails.RemoteOptions.Code.DOWNLOADER_WITHOUT_GRPC_CACHE);
}
if (!enableDiskCache && !enableHttpCache && !enableGrpcCache && !enableRemoteExecution) {
// Quit if no remote caching or execution was enabled.
actionContextProvider =
RemoteActionContextProvider.createForPlaceholder(env, retryScheduler, digestUtil);
return;
}
if (enableHttpCache && enableRemoteExecution) {
throw createOptionsExitException(
"Cannot combine gRPC based remote execution with HTTP-based caching",
FailureDetails.RemoteOptions.Code.EXECUTION_WITH_INVALID_CACHE);
}
env.getEventBus().register(this);
String invocationId = env.getCommandId().toString();
String buildRequestId = env.getBuildRequestId();
env.getReporter().handle(Event.info(String.format("Invocation ID: %s", invocationId)));
RxJavaPlugins.setErrorHandler(
error -> env.getReporter().handle(Event.error(Throwables.getStackTraceAsString(error))));
Path logDir =
env.getOutputBase().getRelative(env.getRuntime().getProductName() + "-remote-logs");
cleanAndCreateRemoteLogsDir(logDir);
BuildRequestOptions buildRequestOptions =
env.getOptions().getOptions(BuildRequestOptions.class);
int jobs = 0;
if (buildRequestOptions != null) {
jobs = buildRequestOptions.jobs;
}
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("remote-executor-%d").build();
if (jobs != 0) {
executorService =
new ThreadPoolExecutor(
/*corePoolSize=*/ 0, jobs, 60L, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
} else {
executorService = Executors.newCachedThreadPool(threadFactory);
}
if ((enableHttpCache || enableDiskCache) && !enableGrpcCache) {
initHttpAndDiskCache(env, authAndTlsOptions, remoteOptions, digestUtil);
return;
}
ClientInterceptor loggingInterceptor = null;
if (remoteOptions.experimentalRemoteGrpcLog != null) {
try {
rpcLogFile =
new AsynchronousFileOutputStream(
env.getWorkingDirectory().getRelative(remoteOptions.experimentalRemoteGrpcLog));
} catch (IOException e) {
handleInitFailure(env, e, Code.RPC_LOG_FAILURE);
return;
}
loggingInterceptor = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock());
}
ReferenceCountedChannel execChannel = null;
ReferenceCountedChannel cacheChannel = null;
ReferenceCountedChannel downloaderChannel = null;
// The number of concurrent requests for one connection to a gRPC server is limited by
// MAX_CONCURRENT_STREAMS which is normally being 100+. We assume 50 concurrent requests for
// each connection should be fairly well. The number of connections opened by one channel is
// based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the
// max concurrency per connection is 100.
int maxConcurrencyPerConnection = 100;
int maxConnections = 0;
if (remoteOptions.remoteMaxConnections > 0) {
maxConnections = remoteOptions.remoteMaxConnections;
}
if (enableRemoteExecution) {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();
interceptors.add(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions));
if (loggingInterceptor != null) {
interceptors.add(loggingInterceptor);
}
execChannel =
new ReferenceCountedChannel(
new GoogleChannelConnectionFactory(
channelFactory,
remoteOptions.remoteExecutor,
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection),
maxConnections);
// Create a separate channel if --remote_executor and --remote_cache point to different
// endpoints.
if (remoteOptions.remoteCache.equals(remoteOptions.remoteExecutor)) {
cacheChannel = execChannel.retain();
}
}
if (cacheChannel == null) {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();
interceptors.add(TracingMetadataUtils.newCacheHeadersInterceptor(remoteOptions));
if (loggingInterceptor != null) {
interceptors.add(loggingInterceptor);
}
cacheChannel =
new ReferenceCountedChannel(
new GoogleChannelConnectionFactory(
channelFactory,
remoteOptions.remoteCache,
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection),
maxConnections);
}
if (enableRemoteDownloader) {
// Create a separate channel if --remote_downloader and --remote_cache point to different
// endpoints.
if (remoteOptions.remoteDownloader.equals(remoteOptions.remoteCache)) {
downloaderChannel = cacheChannel.retain();
} else {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();
if (loggingInterceptor != null) {
interceptors.add(loggingInterceptor);
}
downloaderChannel =
new ReferenceCountedChannel(
new GoogleChannelConnectionFactory(
channelFactory,
remoteOptions.remoteDownloader,
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection),
maxConnections);
}
}
CallCredentialsProvider callCredentialsProvider;
try {
callCredentialsProvider =
GoogleAuthUtils.newCallCredentialsProvider(
newCredentials(
env.getClientEnv(),
env.getRuntime().getFileSystem(),
env.getReporter(),
authAndTlsOptions,
remoteOptions));
} catch (IOException e) {
handleInitFailure(env, e, Code.CREDENTIALS_INIT_FAILURE);
return;
}
CallCredentials credentials = callCredentialsProvider.getCallCredentials();
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
// We only check required capabilities for a given endpoint.
//
// If --remote_executor and --remote_cache point to the same endpoint, we require that
// endpoint has both execution and cache capabilities.
//
// If they point to different endpoints, we check the endpoint with execution or cache
// capabilities respectively.
try {
if (execChannel != null) {
if (cacheChannel != execChannel) {
verifyServerCapabilities(
remoteOptions,
execChannel,
credentials,
retrier,
env,
digestUtil,
ServerCapabilitiesRequirement.EXECUTION);
verifyServerCapabilities(
remoteOptions,
cacheChannel,
credentials,
retrier,
env,
digestUtil,
ServerCapabilitiesRequirement.CACHE);
} else {
verifyServerCapabilities(
remoteOptions,
execChannel,
credentials,
retrier,
env,
digestUtil,
ServerCapabilitiesRequirement.EXECUTION_AND_CACHE);
}
} else {
verifyServerCapabilities(
remoteOptions,
cacheChannel,
credentials,
retrier,
env,
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
String errorMessage =
"Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e);
if (remoteOptions.remoteLocalFallback) {
if (verboseFailures) {
errorMessage += System.lineSeparator() + Throwables.getStackTraceAsString(e);
}
env.getReporter().handle(Event.warn(errorMessage));
actionContextProvider =
RemoteActionContextProvider.createForPlaceholder(env, retryScheduler, digestUtil);
return;
} else {
if (verboseFailures) {
env.getReporter().handle(Event.error(Throwables.getStackTraceAsString(e)));
}
throw createExitException(
errorMessage, ExitCode.REMOTE_ERROR, Code.CAPABILITIES_QUERY_FAILURE);
}
}
String remoteBytestreamUriPrefix = remoteOptions.remoteBytestreamUriPrefix;
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
}
}
RemoteCacheClient cacheClient =
new GrpcCacheClient(
cacheChannel.retain(), callCredentialsProvider, remoteOptions, retrier, digestUtil);
cacheChannel.release();
if (enableRemoteExecution) {
if (enableDiskCache) {
try {
cacheClient =
RemoteCacheClientFactory.createDiskAndRemoteClient(
env.getWorkingDirectory(),
remoteOptions.diskCache,
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient,
remoteOptions);
} catch (IOException e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
}
RemoteExecutionClient remoteExecutor;
if (remoteOptions.remoteExecutionKeepalive) {
RemoteRetrier execRetrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
remoteExecutor =
new ExperimentalGrpcRemoteExecutor(
remoteOptions, execChannel.retain(), callCredentialsProvider, execRetrier);
} else {
RemoteRetrier execRetrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
remoteExecutor =
new GrpcRemoteExecutor(execChannel.retain(), callCredentialsProvider, execRetrier);
}
execChannel.release();
RemoteExecutionCache remoteCache =
new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
executorService,
env,
remoteCache,
remoteExecutor,
retryScheduler,
digestUtil,
logDir);
repositoryRemoteExecutorFactoryDelegate.init(
new RemoteRepositoryRemoteExecutorFactory(
remoteCache,
remoteExecutor,
digestUtil,
buildRequestId,
invocationId,
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
} else {
if (enableDiskCache) {
try {
cacheClient =
RemoteCacheClientFactory.createDiskAndRemoteClient(
env.getWorkingDirectory(),
remoteOptions.diskCache,
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient,
remoteOptions);
} catch (IOException e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
}
RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, retryScheduler, digestUtil);
}
buildEventArtifactUploaderFactoryDelegate.init(
new ByteStreamBuildEventArtifactUploaderFactory(
executorService,
env.getReporter(),
verboseFailures,
actionContextProvider.getRemoteCache(),
remoteBytestreamUriPrefix,
buildRequestId,
invocationId));
if (enableRemoteDownloader) {
remoteDownloaderSupplier.set(
new GrpcRemoteDownloader(
buildRequestId,
invocationId,
downloaderChannel.retain(),
Optional.ofNullable(credentials),
retrier,
cacheClient,
remoteOptions));
downloaderChannel.release();
}
}