in server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java [115:167]
public void handleInternal(RoutingContext context,
HttpServerRequest httpRequest,
@NotNull String host,
SocketAddress remoteAddress,
SSTableUploadRequestParam request)
{
// We pause request here, otherwise data streaming will happen before we have our temporary
// file ready for streaming, and we will see request has already been read error. Hence, we
// pause request here and resume it when temporary file has been created and is ready to
// accept the upload.
httpRequest.pause();
InstanceMetrics instanceMetrics = metadataFetcher.instance(host).metrics();
UploadSSTableMetrics.UploadSSTableComponentMetrics componentMetrics
= instanceMetrics.uploadSSTable().forComponent(parseSSTableComponent(request.component()));
long startTimeInNanos = System.nanoTime();
if (!limiter.tryAcquire())
{
String message = String.format("Concurrent upload limit (%d) exceeded", limiter.limit());
instanceMetrics.uploadSSTable().throttled.metric.update(1);
context.fail(wrapHttpException(HttpResponseStatus.TOO_MANY_REQUESTS, message));
return;
}
// to make sure that permit is always released
context.addEndHandler(v -> limiter.releasePermit());
validateKeyspaceAndTable(host, request)
.compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
.compose(uploadDir -> ensureSufficientSpaceAvailable(uploadDir,
instanceMetrics.resource()))
.compose(v -> uploadPathBuilder.build(host, request))
.compose(uploadDirectory -> {
DigestVerifier digestVerifier = digestVerifierFactory.verifier(httpRequest.headers());
return uploader.uploadComponent(httpRequest,
uploadDirectory,
request.component(),
digestVerifier,
configuration.filePermissions());
})
.compose(fs::props)
.onSuccess(fileProps -> {
long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
logger.info("Successfully uploaded SSTable component for request={}, remoteAddress={}, " +
"instance={}, sizeInBytes={}, serviceTimeMillis={}",
request, remoteAddress, host, fileProps.size(), serviceTimeMillis);
componentMetrics.bytesUploadedRate.metric.mark(fileProps.size());
instanceMetrics.uploadSSTable().totalBytesUploadedRate.metric.mark(fileProps.size());
context.json(new SSTableUploadResponse(request.uploadId(), fileProps.size(), serviceTimeMillis));
})
.onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
}