in src/ServiceProfiler.EventPipe.Otel/Azure.Monitor.OpenTelemetry.Profiler.Core/PostStopProcessor.cs [75:183]
private async Task<bool> ExecuteUploadAsync(PostStopOptions e, UploadMode uploadMode, CancellationToken cancellationToken)
{
if (uploadMode == UploadMode.Never)
{
_logger.LogInformation("Skip uploading. Upload mode: {mode}.", uploadMode);
return false;
}
if (!_uploaderPathProvider.TryGetUploaderFullPath(out string uploaderFullPath))
{
_logger.LogError("Uploader is not found.");
return false;
}
int sampleCount = e.Samples.Count();
_logger.LogDebug("There are {sampleNumber} samples before validation.", sampleCount);
if (sampleCount == 0 && uploadMode != UploadMode.Always)
{
return false;
}
e.UploaderFullPath = uploaderFullPath;
int processId = CurrentProcessUtilities.GetId();
string pipeName = Guid.NewGuid().ToString("D");
Guid appId = Guid.Empty;
Task namedPipeClientTask = Task.Run(async () =>
{
INamedPipeClientService namedPipeClient = _namedPipeClientFactory.CreateNamedPipeService();
try
{
_logger.LogTrace("Waiting for connection of named pipe: {name}", pipeName);
await namedPipeClient.ConnectAsync(pipeName, cancellationToken).ConfigureAwait(false);
_logger.LogTrace("Namedpipe {name} connected.", namedPipeClient.PipeName);
_logger.LogTrace("Sending serialized samples.");
await namedPipeClient.SendAsync(e.Samples).ConfigureAwait(false);
_logger.LogTrace("Finished sending samples.");
// Contract with Uploader: Only valid samples are written back.
_logger.LogTrace("Waiting for the uploader to write back valid samples according to the contract.");
// The uploader might need a while for sample validation before it returns the result. That is especially true under heavy loaded system.
// Give it at least 10 minutes as a reasonable timeout. The user could choose to overwrite it with even longer time span by setting up operation timeout.
double longerTimeoutMilliseconds = Math.Max(TimeSpan.FromMinutes(10).TotalMilliseconds, _serviceProfilerOptions.NamedPipe.DefaultMessageTimeout.TotalMilliseconds);
e.Samples = (await namedPipeClient.ReadAsync<IEnumerable<SampleActivity>>(timeout: TimeSpan.FromMilliseconds(longerTimeoutMilliseconds)).ConfigureAwait(false)) ?? [];
_logger.LogTrace("Finished loading valid samples.");
// Sending the AccessToken for AAD authentication in case it is enabled.
_logger.LogTrace("Sending access token");
AccessToken accessToken = await _authTokenProvider.GetTokenAsync(cancellationToken: default).ConfigureAwait(false);
await namedPipeClient.SendAsync(accessToken).ConfigureAwait(false);
_logger.LogTrace("Finished sending access token for the uploader to use.");
// Contract with Uploader: Return app id.
_logger.LogTrace("Waiting for the uploader to write back valid appId as dataCube.");
appId = await namedPipeClient.ReadAsync<Guid>().ConfigureAwait(false);
_logger.LogTrace("Finished retrieving a valid appId (dataCube): {appId}", appId);
if (appId == Guid.Empty)
{
throw new InvalidOperationException($"Datacube {appId} is invalid.");
}
// Contract with Upload, sending additional data
IPCAdditionalData additionalData = CreateAdditionalData(e.Samples.ToImmutableArray(), stampId: "%StampId%", e.SessionId, appId, e.ProfilerSource);
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("Sending additional data for the uploader to use.");
if (_serializer.TrySerialize(additionalData, out string? serializedObject))
{
_logger.LogTrace("===== {serialized} =====", Environment.NewLine + serializedObject + Environment.NewLine);
}
else
{
if (e.Samples.Any())
{
_logger.LogWarning("Although there are valid samples, there's no additional data. Why?");
}
else
{
_logger.LogTrace("No additional data");
}
}
}
await namedPipeClient.SendAsync(additionalData, TimeSpan.FromMilliseconds(longerTimeoutMilliseconds), cancellationToken).ConfigureAwait(false);
_logger.LogTrace("Additional data sent.");
}
finally
{
(namedPipeClient as IDisposable)?.Dispose();
}
}, cancellationToken);
Task<UploadContextModel?> uploadTask = UploadTraceAsync(e, processId, pipeName, cancellationToken);
// Waiting for both task to finish.
await Task.WhenAll(namedPipeClientTask, uploadTask).ConfigureAwait(false);
UploadContextModel? uploadContext = uploadTask.Result;
if (uploadContext != null)
{
// Trace is uploaded.
int validSampleCount = e.Samples.Count();
_logger.LogDebug("Sent {validSampleCount} valid custom events to AI. Valid sample count equals total sample count: {result}", validSampleCount, validSampleCount == sampleCount);
}
return true;
}