public void handleInternal()

in server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java [115:167]


    public void handleInternal(RoutingContext context,
                               HttpServerRequest httpRequest,
                               @NotNull String host,
                               SocketAddress remoteAddress,
                               SSTableUploadRequestParam request)
    {
        // We pause request here, otherwise data streaming will happen before we have our temporary
        // file ready for streaming, and we will see request has already been read error. Hence, we
        // pause request here and resume it when temporary file has been created and is ready to
        // accept the upload.
        httpRequest.pause();

        InstanceMetrics instanceMetrics = metadataFetcher.instance(host).metrics();
        UploadSSTableMetrics.UploadSSTableComponentMetrics componentMetrics
        = instanceMetrics.uploadSSTable().forComponent(parseSSTableComponent(request.component()));

        long startTimeInNanos = System.nanoTime();
        if (!limiter.tryAcquire())
        {
            String message = String.format("Concurrent upload limit (%d) exceeded", limiter.limit());
            instanceMetrics.uploadSSTable().throttled.metric.update(1);
            context.fail(wrapHttpException(HttpResponseStatus.TOO_MANY_REQUESTS, message));
            return;
        }
        // to make sure that permit is always released
        context.addEndHandler(v -> limiter.releasePermit());

        validateKeyspaceAndTable(host, request)
        .compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
        .compose(uploadDir -> ensureSufficientSpaceAvailable(uploadDir,
                                                             instanceMetrics.resource()))
        .compose(v -> uploadPathBuilder.build(host, request))
        .compose(uploadDirectory -> {
            DigestVerifier digestVerifier = digestVerifierFactory.verifier(httpRequest.headers());
            return uploader.uploadComponent(httpRequest,
                                            uploadDirectory,
                                            request.component(),
                                            digestVerifier,
                                            configuration.filePermissions());
        })
        .compose(fs::props)
        .onSuccess(fileProps -> {
            long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
            logger.info("Successfully uploaded SSTable component for request={}, remoteAddress={}, " +
                        "instance={}, sizeInBytes={}, serviceTimeMillis={}",
                        request, remoteAddress, host, fileProps.size(), serviceTimeMillis);
            componentMetrics.bytesUploadedRate.metric.mark(fileProps.size());
            instanceMetrics.uploadSSTable().totalBytesUploadedRate.metric.mark(fileProps.size());

            context.json(new SSTableUploadResponse(request.uploadId(), fileProps.size(), serviceTimeMillis));
        })
        .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
    }