private async Task SendBatchAsync()

in Amazon.KinesisTap.AWS/AsyncCloudWatchLogsSink.cs [185:318]


        private async Task SendBatchAsync(List<Envelope<InputLogEvent>> records, CancellationToken stopToken)
        {
            if (records.Count == 0)
            {
                return;
            }

            // resolve log group name
            var logGroup = ResolveVariables(_logGroup);

            // resolve log stream name, including the timestamp.
            var logStreamName = ResolveVariables(_logStream);
            logStreamName = ResolveTimestampInLogStreamName(logStreamName, records[0].Timestamp);

            var batchBytes = records.Sum(r => GetRecordSize(r));

            Interlocked.Add(ref _recordsAttempted, records.Count);
            Interlocked.Add(ref _bytesAttempted, batchBytes);
            Interlocked.Exchange(ref _clientLatency, (long)records.Average(r => (DateTime.UtcNow - r.Timestamp).TotalMilliseconds));

            _logger.LogDebug("Sending {0} records {1} bytes to log group '{2}', log stream '{3}'.",
                records.Count, batchBytes, logGroup, logStreamName);

            var sendCount = 0;
            if (records[records.Count - 1].Timestamp - records[0].Timestamp <= _batchMaximumTimeSpan)
            {
                sendCount = records.Count;
            }
            else
            {
                while (sendCount < records.Count && records[sendCount].Timestamp - records[0].Timestamp <= _batchMaximumTimeSpan)
                {
                    sendCount++;
                }
            }

            var recordsToSend = records.Take(sendCount).Select(e => e.Data).ToList();
            var beforeSendTimestamp = Utility.GetElapsedMilliseconds();

            // If the sequence token is null, try to get it.
            // If the log stream doesn't exist, create it (by specifying "true" in the second parameter).
            // This should be the only place where a log stream is created.
            // This method will ensure that both the log group and stream exists,
            // so there is no need to handle a ResourceNotFound exception later.
            if (string.IsNullOrEmpty(_sequenceToken) || AWSConstants.NullString.Equals(_sequenceToken))
            {
                await GetSequenceTokenAsync(logGroup, logStreamName, true, stopToken);
            }

            var request = new PutLogEventsRequest
            {
                LogGroupName = logGroup,
                LogStreamName = logStreamName,
                SequenceToken = _sequenceToken,
                LogEvents = recordsToSend
            };

            try
            {
                // try sending the records and mark them as sent
                var response = await _cloudWatchLogsClient.PutLogEventsAsync(request, stopToken);
                Interlocked.Exchange(ref _latency, Utility.GetElapsedMilliseconds() - beforeSendTimestamp);

                // update sequence token
                _sequenceToken = response.NextSequenceToken;

                var recordsSent = recordsToSend.Count;
                var rejectedLogEventsInfo = response.RejectedLogEventsInfo;
                if (rejectedLogEventsInfo is not null)
                {
                    var sb = new StringBuilder()
                        .Append("CloudWatchLogsSink encountered some rejected logs.")
                        .AppendFormat(" ExpiredLogEventEndIndex {0}", rejectedLogEventsInfo.ExpiredLogEventEndIndex)
                        .AppendFormat(" TooNewLogEventStartIndex {0}", rejectedLogEventsInfo.TooNewLogEventStartIndex)
                        .AppendFormat(" TooOldLogEventEndIndex {0}", rejectedLogEventsInfo.TooOldLogEventEndIndex);
                    _logger.LogWarning(sb.ToString());
                    if (rejectedLogEventsInfo.TooNewLogEventStartIndex >= 0)
                    {
                        recordsSent -= recordsToSend.Count - rejectedLogEventsInfo.TooNewLogEventStartIndex;
                    }
                    var tooOldIndex = Math.Max(rejectedLogEventsInfo.TooNewLogEventStartIndex, rejectedLogEventsInfo.ExpiredLogEventEndIndex);
                    if (tooOldIndex > 0)
                    {
                        recordsSent -= tooOldIndex;
                    }
                    if (recordsSent < 0)
                    {
                        recordsSent = 0;
                    }
                }


                Interlocked.Add(ref _recordsSuccess, recordsSent);
                Interlocked.Add(ref _recordsFailedNonrecoverable, recordsToSend.Count - recordsSent);
                _logger.LogDebug("Successfully sent {0} records.", recordsSent);

                // save the bookmarks only for the records that were processed
                await SaveBookmarksAsync(records.Take(sendCount).ToList());

                records.RemoveRange(0, sendCount);
            }
            catch (AmazonCloudWatchLogsException acle)
            {
                Interlocked.Exchange(ref _latency, Utility.GetElapsedMilliseconds() - beforeSendTimestamp);

                // handle the types of exceptions we know how to handle
                // then return so that the records can be re-sent
                switch (acle)
                {
                    case InvalidSequenceTokenException iste:
                        _sequenceToken = iste.ExpectedSequenceToken;
                        break;
                    case ResourceNotFoundException:
                        await GetSequenceTokenAsync(logGroup, logStreamName, true, stopToken);
                        break;
                    case DataAlreadyAcceptedException:
                        // this batch won't be accepted, skip it
                        await SaveBookmarksAsync(records.Take(sendCount).ToList());
                        records.RemoveRange(0, sendCount);
                        break;
                    case InvalidParameterException ipme:
                        // this can happens due to a log event being too large
                        // we already checked for this when creating the record,
                        // so best thing we can do here is to skip this batch and moveon
                        _logger.LogError(ipme, "Error sending records to CloudWatchLogs");
                        records.RemoveRange(0, sendCount);
                        break;
                    default:
                        // for other exceptions we rethrow so the main loop can catch it
                        throw;

                }
            }
        }