in Amazon.KinesisTap.AWS/CloudWatchLogsSink.cs [215:361]
private async Task SendBatchAsync(List<Envelope<InputLogEvent>> records)
{
var batchBytes = records.Sum(r => GetRecordSize(r));
try
{
_logger?.LogDebug("CloudWatchLogsSink client {0} sending {1} records {2} bytes.", Id, records.Count, batchBytes);
var logStreamName = ResolveTimestampInLogStreamName(records[0].Timestamp);
var request = new PutLogEventsRequest
{
LogGroupName = _logGroupName,
LogStreamName = logStreamName,
SequenceToken = _sequenceToken,
LogEvents = records
.Select(e => e.Data)
.ToList()
};
int invalidSequenceTokenCount = 0;
while (true)
{
var utcNow = DateTime.UtcNow;
_clientLatency = (long)records.Average(r => (utcNow - r.Timestamp).TotalMilliseconds);
long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
try
{
// If the sequence token is null, try to get it.
// If the log stream doesn't exist, create it (by specifying "true" in the second parameter).
// This should be the only place where a log stream is created.
// This method will ensure that both the log group and stream exists,
// so there is no need to handle a ResourceNotFound exception later.
if (string.IsNullOrEmpty(_sequenceToken))
{
await GetSequenceTokenAsync(logStreamName, true);
}
var response = await SendRequestAsync(request);
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetSuccess();
_sequenceToken = response.NextSequenceToken;
_recordsAttempted += records.Count;
_bytesAttempted += batchBytes;
var rejectedLogEventsInfo = response.RejectedLogEventsInfo;
if (rejectedLogEventsInfo != null)
{
// Don't do the expensive string building unless we know the logger isn't null.
if (_logger != null)
{
var sb = new StringBuilder()
.AppendFormat("CloudWatchLogsSink client {0} encountered some rejected logs.", Id)
.AppendFormat(" ExpiredLogEventEndIndex {0}", rejectedLogEventsInfo.ExpiredLogEventEndIndex)
.AppendFormat(" TooNewLogEventStartIndex {0}", rejectedLogEventsInfo.TooNewLogEventStartIndex)
.AppendFormat(" TooOldLogEventEndIndex {0}", rejectedLogEventsInfo.TooOldLogEventEndIndex);
_logger.LogError(sb.ToString());
}
var recordCount = records.Count - rejectedLogEventsInfo.ExpiredLogEventEndIndex - rejectedLogEventsInfo.TooOldLogEventEndIndex;
if (rejectedLogEventsInfo.TooOldLogEventEndIndex > 0)
recordCount -= records.Count - rejectedLogEventsInfo.TooNewLogEventStartIndex;
_recordsFailedNonrecoverable += (records.Count - recordCount);
}
_logger?.LogDebug("CloudWatchLogsSink client {0} successfully sent {1} records {2} bytes.", Id, records.Count, batchBytes);
_recordsSuccess += records.Count;
await SaveBookmarks(records);
break;
}
catch (AmazonCloudWatchLogsException ex)
{
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetError();
// InvalidSequenceTokenExceptions are thrown when a PutLogEvents call doesn't have a valid sequence token.
// This is usually recoverable, so we'll try twice before requeuing events.
if (ex is InvalidSequenceTokenException invalidSequenceTokenException && invalidSequenceTokenCount < 2)
{
// Increment the invalid sequence token counter, to limit the "instant retries" that we attempt.
invalidSequenceTokenCount++;
// The exception from CloudWatch contains the sequence token, so we'll try to parse it out.
_sequenceToken = invalidSequenceTokenException.GetExpectedSequenceToken();
// Sometimes we get a sequence token with a string value of "null".
// This is invalid so we'll fetch it again and retry immediately.
// If credentials have expired or this request is being throttled,
// the wrapper try/catch will capture it and data will
if (AWSConstants.NullString.Equals(_sequenceToken))
{
_sequenceToken = null;
await GetSequenceTokenAsync(logStreamName, false);
}
// Reset the sequence token in the request and immediately retry (without requeuing),
// so that the sequence token does not become invalid again.
request.SequenceToken = _sequenceToken;
continue;
}
// Retry if one of the following was true:
// - The exception was thrown because an invalid sequence token was used (more than twice in a row)
// - The service was unavailable (transient error or service outage)
// - The security token in the credentials has expired (previously this was handled as an unrecoverable error)
if (IsRecoverableException(ex))
{
// Try to requeue the records into the buffer.
// This will mean that the events in the buffer are now out of order :(
// There's nothing we can do about that short of rewriting all the buffering logic.
// Having out of order events isn't that bad, because the service that we're uploading
// to will be storing them based on their event time anyway. However, this can affect
// the persistent bookmarking behavior, since bookmarks are updated based on the
// position/eventId in the last batch sent, not what's currently in the buffer.
if (_buffer.Requeue(records, _throttle.ConsecutiveErrorCount < _maxAttempts))
{
_logger?.LogWarning("CloudWatchLogsSink client {0} attempt={1} exception={2}. Will retry.", Id, _throttle.ConsecutiveErrorCount, ex.Message);
_recoverableServiceErrors++;
_recordsFailedRecoverable += records.Count;
break;
}
}
_recordsFailedNonrecoverable += records.Count;
_nonrecoverableServiceErrors++;
throw;
}
catch (Exception)
{
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetError();
_recordsFailedNonrecoverable += records.Count;
_nonrecoverableServiceErrors++;
throw;
}
}
}
catch (Exception ex)
{
_logger?.LogError("CloudWatchLogsSink client {0} exception (attempt={1}): {2}", Id, _throttle.ConsecutiveErrorCount, ex.ToMinimized());
}
PublishMetrics(MetricsConstants.CLOUDWATCHLOG_PREFIX);
}