in src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java [93:130]
public void handleInternal(RoutingContext context,
HttpServerRequest httpRequest,
String host,
SocketAddress remoteAddress,
SSTableUploadRequest request)
{
// We pause request here, otherwise data streaming will happen before we have our temporary
// file ready for streaming, and we will see request has already been read error. Hence, we
// pause request here and resume it when temporary file has been created and is ready to
// accept the upload.
httpRequest.pause();
long startTimeInNanos = System.nanoTime();
if (!limiter.tryAcquire())
{
String message = String.format("Concurrent upload limit (%d) exceeded", limiter.limit());
context.fail(wrapHttpException(HttpResponseStatus.TOO_MANY_REQUESTS, message));
return;
}
// to make sure that permit is always released
context.addEndHandler(v -> limiter.releasePermit());
validateKeyspaceAndTable(host, request)
.compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
.compose(this::ensureSufficientSpaceAvailable)
.compose(v -> uploadPathBuilder.build(host, request))
.compose(uploadDirectory -> uploader.uploadComponent(httpRequest, uploadDirectory, request.component(),
request.expectedChecksum()))
.compose(fs::props)
.onSuccess(fileProps -> {
long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
logger.info("Successfully uploaded SSTable component for request={}, remoteAddress={}, " +
"instance={}, sizeInBytes={}, serviceTimeMillis={}",
request, remoteAddress, host, fileProps.size(), serviceTimeMillis);
context.json(new SSTableUploadResponse(request.uploadId(), fileProps.size(), serviceTimeMillis));
})
.onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
}