private async Task SendBatchAsync()

in Amazon.KinesisTap.AWS/CloudWatchLogsSink.cs [215:361]


        private async Task SendBatchAsync(List<Envelope<InputLogEvent>> records)
        {
            var batchBytes = records.Sum(r => GetRecordSize(r));

            try
            {
                _logger?.LogDebug("CloudWatchLogsSink client {0} sending {1} records {2} bytes.", Id, records.Count, batchBytes);

                var logStreamName = ResolveTimestampInLogStreamName(records[0].Timestamp);

                var request = new PutLogEventsRequest
                {
                    LogGroupName = _logGroupName,
                    LogStreamName = logStreamName,
                    SequenceToken = _sequenceToken,
                    LogEvents = records
                        .Select(e => e.Data)
                        .ToList()
                };

                int invalidSequenceTokenCount = 0;
                while (true)
                {
                    var utcNow = DateTime.UtcNow;
                    _clientLatency = (long)records.Average(r => (utcNow - r.Timestamp).TotalMilliseconds);

                    long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
                    try
                    {
                        // If the sequence token is null, try to get it.
                        // If the log stream doesn't exist, create it (by specifying "true" in the second parameter).
                        // This should be the only place where a log stream is created.
                        // This method will ensure that both the log group and stream exists,
                        // so there is no need to handle a ResourceNotFound exception later.
                        if (string.IsNullOrEmpty(_sequenceToken))
                        {
                            await GetSequenceTokenAsync(logStreamName, true);
                        }

                        var response = await SendRequestAsync(request);
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetSuccess();
                        _sequenceToken = response.NextSequenceToken;
                        _recordsAttempted += records.Count;
                        _bytesAttempted += batchBytes;

                        var rejectedLogEventsInfo = response.RejectedLogEventsInfo;
                        if (rejectedLogEventsInfo != null)
                        {
                            // Don't do the expensive string building unless we know the logger isn't null.
                            if (_logger != null)
                            {
                                var sb = new StringBuilder()
                                    .AppendFormat("CloudWatchLogsSink client {0} encountered some rejected logs.", Id)
                                    .AppendFormat(" ExpiredLogEventEndIndex {0}", rejectedLogEventsInfo.ExpiredLogEventEndIndex)
                                    .AppendFormat(" TooNewLogEventStartIndex {0}", rejectedLogEventsInfo.TooNewLogEventStartIndex)
                                    .AppendFormat(" TooOldLogEventEndIndex {0}", rejectedLogEventsInfo.TooOldLogEventEndIndex);
                                _logger.LogError(sb.ToString());
                            }

                            var recordCount = records.Count - rejectedLogEventsInfo.ExpiredLogEventEndIndex - rejectedLogEventsInfo.TooOldLogEventEndIndex;
                            if (rejectedLogEventsInfo.TooOldLogEventEndIndex > 0)
                                recordCount -= records.Count - rejectedLogEventsInfo.TooNewLogEventStartIndex;

                            _recordsFailedNonrecoverable += (records.Count - recordCount);
                        }

                        _logger?.LogDebug("CloudWatchLogsSink client {0} successfully sent {1} records {2} bytes.", Id, records.Count, batchBytes);
                        _recordsSuccess += records.Count;
                        await SaveBookmarks(records);

                        break;
                    }
                    catch (AmazonCloudWatchLogsException ex)
                    {
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetError();

                        // InvalidSequenceTokenExceptions are thrown when a PutLogEvents call doesn't have a valid sequence token.
                        // This is usually recoverable, so we'll try twice before requeuing events.
                        if (ex is InvalidSequenceTokenException invalidSequenceTokenException && invalidSequenceTokenCount < 2)
                        {
                            // Increment the invalid sequence token counter, to limit the "instant retries" that we attempt.
                            invalidSequenceTokenCount++;

                            // The exception from CloudWatch contains the sequence token, so we'll try to parse it out.
                            _sequenceToken = invalidSequenceTokenException.GetExpectedSequenceToken();

                            // Sometimes we get a sequence token with a string value of "null".
                            // This is invalid so we'll fetch it again and retry immediately.
                            // If credentials have expired or this request is being throttled,
                            // the wrapper try/catch will capture it and data will
                            if (AWSConstants.NullString.Equals(_sequenceToken))
                            {
                                _sequenceToken = null;
                                await GetSequenceTokenAsync(logStreamName, false);
                            }

                            // Reset the sequence token in the request and immediately retry (without requeuing),
                            // so that the sequence token does not become invalid again.
                            request.SequenceToken = _sequenceToken;
                            continue;
                        }

                        // Retry if one of the following was true:
                        // - The exception was thrown because an invalid sequence token was used (more than twice in a row)
                        // - The service was unavailable (transient error or service outage)
                        // - The security token in the credentials has expired (previously this was handled as an unrecoverable error)
                        if (IsRecoverableException(ex))
                        {
                            // Try to requeue the records into the buffer.
                            // This will mean that the events in the buffer are now out of order :(
                            // There's nothing we can do about that short of rewriting all the buffering logic.
                            // Having out of order events isn't that bad, because the service that we're uploading
                            // to will be storing them based on their event time anyway. However, this can affect
                            // the persistent bookmarking behavior, since bookmarks are updated based on the
                            // position/eventId in the last batch sent, not what's currently in the buffer.
                            if (_buffer.Requeue(records, _throttle.ConsecutiveErrorCount < _maxAttempts))
                            {
                                _logger?.LogWarning("CloudWatchLogsSink client {0} attempt={1} exception={2}. Will retry.", Id, _throttle.ConsecutiveErrorCount, ex.Message);
                                _recoverableServiceErrors++;
                                _recordsFailedRecoverable += records.Count;
                                break;
                            }
                        }

                        _recordsFailedNonrecoverable += records.Count;
                        _nonrecoverableServiceErrors++;
                        throw;
                    }
                    catch (Exception)
                    {
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetError();
                        _recordsFailedNonrecoverable += records.Count;
                        _nonrecoverableServiceErrors++;
                        throw;
                    }
                }
            }
            catch (Exception ex)
            {
                _logger?.LogError("CloudWatchLogsSink client {0} exception (attempt={1}): {2}", Id, _throttle.ConsecutiveErrorCount, ex.ToMinimized());
            }

            PublishMetrics(MetricsConstants.CLOUDWATCHLOG_PREFIX);
        }