in src/Elastic.Apm/Report/PayloadSenderV2.cs [358:479]
private void ProcessQueueItems(object[] queueItems)
{
// can reuse underlying buffers from a pool in future.
using var stream = new MemoryStream(1024);
try
{
_cachedMetadataJsonLine ??= _payloadItemSerializer.Serialize(_metadata);
using (var writer = new StreamWriter(stream, Utf8Encoding, 1024, true))
{
writer.Write("{\"metadata\":");
writer.Write(_cachedMetadataJsonLine);
writer.Write("}\n");
foreach (var item in queueItems)
{
switch (item)
{
case Transaction transaction:
if (TryExecuteFilter(TransactionFilters, transaction) != null)
Serialize(item, "transaction", writer);
break;
case Span span:
if (TryExecuteFilter(SpanFilters, span) != null)
Serialize(item, "span", writer);
break;
case Error error:
if (TryExecuteFilter(ErrorFilters, error) != null)
Serialize(item, "error", writer);
break;
case MetricSet _:
Serialize(item, "metricset", writer);
break;
}
}
}
stream.Position = 0;
using (var content = new StreamContent(stream))
{
content.Headers.ContentType = MediaTypeHeaderValue;
#if NET8_0_OR_GREATER
HttpResponseMessage response;
try
{
var webRequest = new HttpRequestMessage(HttpMethod.Post, _intakeV2EventsAbsoluteUrl) { Content = content };
response = HttpClient.Send(webRequest, CancellationTokenSource.Token);
}
catch (Exception) //HttpMessageHandler may not support synchronous sending
{
response = HttpClient.PostAsync(_intakeV2EventsAbsoluteUrl, content, CancellationTokenSource.Token)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
}
#else
var response = HttpClient.PostAsync(_intakeV2EventsAbsoluteUrl, content, CancellationTokenSource.Token)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
#endif
// ReSharper disable ConditionIsAlwaysTrueOrFalse
if (response is null || !response.IsSuccessStatusCode)
{
var message = "Unknown 400 Bad Request";
if (response?.Content != null)
{
#if NET8_0_OR_GREATER
var intakeResponse = _payloadItemSerializer.Deserialize<IntakeResponse>(response.Content.ReadAsStream());
#else
var intakeResponse = _payloadItemSerializer.Deserialize<IntakeResponse>(response.Content.ReadAsStreamAsync().GetAwaiter().GetResult());
#endif
if (intakeResponse.Errors.Count > 0)
message = string.Join(", ", intakeResponse.Errors.Select(e => e.Message).Distinct());
}
_logger?.Error()
?.Log("Failed sending event."
+ " Events intake API absolute URL: {EventsIntakeAbsoluteUrl}."
+ " APM Server response: status code: {ApmServerResponseStatusCode}"
+ ", reasons: {ApmServerResponseContent}"
, _intakeV2EventsAbsoluteUrl.Sanitize()
, response?.StatusCode,
message
);
}
// ReSharper enable ConditionIsAlwaysTrueOrFalse
else
{
_logger?.Debug()
?.Log("Sent items to server:"
+ $"{Environment.NewLine}{TextUtils.Indentation}{{SerializedItems}}",
string.Join($",{Environment.NewLine}{TextUtils.Indentation}", queueItems));
}
}
}
catch (OperationCanceledException)
{
// handle cancellation specifically
_logger?.Warning()
?.Log(
"Cancellation requested. Following events were not transferred successfully to the server ({ApmServerUrl}):"
+ $"{Environment.NewLine}{TextUtils.Indentation}{{SerializedItems}}"
, HttpClient.BaseAddress.Sanitize()
, string.Join($",{Environment.NewLine}{TextUtils.Indentation}", queueItems));
// throw to allow Workloop to handle
throw;
}
catch (Exception e)
{
_logger?.Warning()
?.LogException(
e,
"Failed sending events. Following events were not transferred successfully to the server ({ApmServerUrl}):"
+ $"{Environment.NewLine}{TextUtils.Indentation}{{SerializedItems}}"
, HttpClient.BaseAddress.Sanitize()
, string.Join($",{Environment.NewLine}{TextUtils.Indentation}", queueItems)
);
}
}