in Amazon.KinesisTap.AWS/AsyncCloudWatchLogsSink.cs [185:318]
private async Task SendBatchAsync(List<Envelope<InputLogEvent>> records, CancellationToken stopToken)
{
if (records.Count == 0)
{
return;
}
// resolve log group name
var logGroup = ResolveVariables(_logGroup);
// resolve log stream name, including the timestamp.
var logStreamName = ResolveVariables(_logStream);
logStreamName = ResolveTimestampInLogStreamName(logStreamName, records[0].Timestamp);
var batchBytes = records.Sum(r => GetRecordSize(r));
Interlocked.Add(ref _recordsAttempted, records.Count);
Interlocked.Add(ref _bytesAttempted, batchBytes);
Interlocked.Exchange(ref _clientLatency, (long)records.Average(r => (DateTime.UtcNow - r.Timestamp).TotalMilliseconds));
_logger.LogDebug("Sending {0} records {1} bytes to log group '{2}', log stream '{3}'.",
records.Count, batchBytes, logGroup, logStreamName);
var sendCount = 0;
if (records[records.Count - 1].Timestamp - records[0].Timestamp <= _batchMaximumTimeSpan)
{
sendCount = records.Count;
}
else
{
while (sendCount < records.Count && records[sendCount].Timestamp - records[0].Timestamp <= _batchMaximumTimeSpan)
{
sendCount++;
}
}
var recordsToSend = records.Take(sendCount).Select(e => e.Data).ToList();
var beforeSendTimestamp = Utility.GetElapsedMilliseconds();
// If the sequence token is null, try to get it.
// If the log stream doesn't exist, create it (by specifying "true" in the second parameter).
// This should be the only place where a log stream is created.
// This method will ensure that both the log group and stream exists,
// so there is no need to handle a ResourceNotFound exception later.
if (string.IsNullOrEmpty(_sequenceToken) || AWSConstants.NullString.Equals(_sequenceToken))
{
await GetSequenceTokenAsync(logGroup, logStreamName, true, stopToken);
}
var request = new PutLogEventsRequest
{
LogGroupName = logGroup,
LogStreamName = logStreamName,
SequenceToken = _sequenceToken,
LogEvents = recordsToSend
};
try
{
// try sending the records and mark them as sent
var response = await _cloudWatchLogsClient.PutLogEventsAsync(request, stopToken);
Interlocked.Exchange(ref _latency, Utility.GetElapsedMilliseconds() - beforeSendTimestamp);
// update sequence token
_sequenceToken = response.NextSequenceToken;
var recordsSent = recordsToSend.Count;
var rejectedLogEventsInfo = response.RejectedLogEventsInfo;
if (rejectedLogEventsInfo is not null)
{
var sb = new StringBuilder()
.Append("CloudWatchLogsSink encountered some rejected logs.")
.AppendFormat(" ExpiredLogEventEndIndex {0}", rejectedLogEventsInfo.ExpiredLogEventEndIndex)
.AppendFormat(" TooNewLogEventStartIndex {0}", rejectedLogEventsInfo.TooNewLogEventStartIndex)
.AppendFormat(" TooOldLogEventEndIndex {0}", rejectedLogEventsInfo.TooOldLogEventEndIndex);
_logger.LogWarning(sb.ToString());
if (rejectedLogEventsInfo.TooNewLogEventStartIndex >= 0)
{
recordsSent -= recordsToSend.Count - rejectedLogEventsInfo.TooNewLogEventStartIndex;
}
var tooOldIndex = Math.Max(rejectedLogEventsInfo.TooNewLogEventStartIndex, rejectedLogEventsInfo.ExpiredLogEventEndIndex);
if (tooOldIndex > 0)
{
recordsSent -= tooOldIndex;
}
if (recordsSent < 0)
{
recordsSent = 0;
}
}
Interlocked.Add(ref _recordsSuccess, recordsSent);
Interlocked.Add(ref _recordsFailedNonrecoverable, recordsToSend.Count - recordsSent);
_logger.LogDebug("Successfully sent {0} records.", recordsSent);
// save the bookmarks only for the records that were processed
await SaveBookmarksAsync(records.Take(sendCount).ToList());
records.RemoveRange(0, sendCount);
}
catch (AmazonCloudWatchLogsException acle)
{
Interlocked.Exchange(ref _latency, Utility.GetElapsedMilliseconds() - beforeSendTimestamp);
// handle the types of exceptions we know how to handle
// then return so that the records can be re-sent
switch (acle)
{
case InvalidSequenceTokenException iste:
_sequenceToken = iste.ExpectedSequenceToken;
break;
case ResourceNotFoundException:
await GetSequenceTokenAsync(logGroup, logStreamName, true, stopToken);
break;
case DataAlreadyAcceptedException:
// this batch won't be accepted, skip it
await SaveBookmarksAsync(records.Take(sendCount).ToList());
records.RemoveRange(0, sendCount);
break;
case InvalidParameterException ipme:
// this can happens due to a log event being too large
// we already checked for this when creating the record,
// so best thing we can do here is to skip this batch and moveon
_logger.LogError(ipme, "Error sending records to CloudWatchLogs");
records.RemoveRange(0, sendCount);
break;
default:
// for other exceptions we rethrow so the main loop can catch it
throw;
}
}
}