private async Task ExecuteUploadAsync()

in src/ServiceProfiler.EventPipe.Otel/Azure.Monitor.OpenTelemetry.Profiler.Core/PostStopProcessor.cs [75:183]


    private async Task<bool> ExecuteUploadAsync(PostStopOptions e, UploadMode uploadMode, CancellationToken cancellationToken)
    {
        if (uploadMode == UploadMode.Never)
        {
            _logger.LogInformation("Skip uploading. Upload mode: {mode}.", uploadMode);
            return false;
        }

        if (!_uploaderPathProvider.TryGetUploaderFullPath(out string uploaderFullPath))
        {
            _logger.LogError("Uploader is not found.");
            return false;
        }

        int sampleCount = e.Samples.Count();
        _logger.LogDebug("There are {sampleNumber} samples before validation.", sampleCount);
        if (sampleCount == 0 && uploadMode != UploadMode.Always)
        {
            return false;
        }

        e.UploaderFullPath = uploaderFullPath;

        int processId = CurrentProcessUtilities.GetId();

        string pipeName = Guid.NewGuid().ToString("D");
        Guid appId = Guid.Empty;

        Task namedPipeClientTask = Task.Run(async () =>
        {
            INamedPipeClientService namedPipeClient = _namedPipeClientFactory.CreateNamedPipeService();
            try
            {
                _logger.LogTrace("Waiting for connection of named pipe: {name}", pipeName);
                await namedPipeClient.ConnectAsync(pipeName, cancellationToken).ConfigureAwait(false);
                _logger.LogTrace("Namedpipe {name} connected.", namedPipeClient.PipeName);

                _logger.LogTrace("Sending serialized samples.");
                await namedPipeClient.SendAsync(e.Samples).ConfigureAwait(false);
                _logger.LogTrace("Finished sending samples.");

                // Contract with Uploader: Only valid samples are written back.
                _logger.LogTrace("Waiting for the uploader to write back valid samples according to the contract.");
                // The uploader might need a while for sample validation before it returns the result. That is especially true under heavy loaded system.
                // Give it at least 10 minutes as a reasonable timeout. The user could choose to overwrite it with even longer time span by setting up operation timeout.
                double longerTimeoutMilliseconds = Math.Max(TimeSpan.FromMinutes(10).TotalMilliseconds, _serviceProfilerOptions.NamedPipe.DefaultMessageTimeout.TotalMilliseconds);
                e.Samples = (await namedPipeClient.ReadAsync<IEnumerable<SampleActivity>>(timeout: TimeSpan.FromMilliseconds(longerTimeoutMilliseconds)).ConfigureAwait(false)) ?? [];
                _logger.LogTrace("Finished loading valid samples.");

                // Sending the AccessToken for AAD authentication in case it is enabled.
                _logger.LogTrace("Sending access token");
                AccessToken accessToken = await _authTokenProvider.GetTokenAsync(cancellationToken: default).ConfigureAwait(false);
                await namedPipeClient.SendAsync(accessToken).ConfigureAwait(false);
                _logger.LogTrace("Finished sending access token for the uploader to use.");

                // Contract with Uploader: Return app id.
                _logger.LogTrace("Waiting for the uploader to write back valid appId as dataCube.");
                appId = await namedPipeClient.ReadAsync<Guid>().ConfigureAwait(false);
                _logger.LogTrace("Finished retrieving a valid appId (dataCube): {appId}", appId);
                if (appId == Guid.Empty)
                {
                    throw new InvalidOperationException($"Datacube {appId} is invalid.");
                }

                // Contract with Upload, sending additional data
                IPCAdditionalData additionalData = CreateAdditionalData(e.Samples.ToImmutableArray(), stampId: "%StampId%", e.SessionId, appId, e.ProfilerSource);
                if (_logger.IsEnabled(LogLevel.Trace))
                {
                    _logger.LogTrace("Sending additional data for the uploader to use.");
                    if (_serializer.TrySerialize(additionalData, out string? serializedObject))
                    {
                        _logger.LogTrace("===== {serialized} =====", Environment.NewLine + serializedObject + Environment.NewLine);
                    }
                    else
                    {
                        if (e.Samples.Any())
                        {
                            _logger.LogWarning("Although there are valid samples, there's no additional data. Why?");
                        }
                        else
                        {
                            _logger.LogTrace("No additional data");
                        }
                    }
                }
                await namedPipeClient.SendAsync(additionalData, TimeSpan.FromMilliseconds(longerTimeoutMilliseconds), cancellationToken).ConfigureAwait(false);
                _logger.LogTrace("Additional data sent.");
            }
            finally
            {
                (namedPipeClient as IDisposable)?.Dispose();
            }
        }, cancellationToken);

        Task<UploadContextModel?> uploadTask = UploadTraceAsync(e, processId, pipeName, cancellationToken);

        // Waiting for both task to finish.
        await Task.WhenAll(namedPipeClientTask, uploadTask).ConfigureAwait(false);

        UploadContextModel? uploadContext = uploadTask.Result;
        if (uploadContext != null)
        {
            // Trace is uploaded.
            int validSampleCount = e.Samples.Count();
            _logger.LogDebug("Sent {validSampleCount} valid custom events to AI. Valid sample count equals total sample count: {result}", validSampleCount, validSampleCount == sampleCount);
        }

        return true;
    }