protected override async Task OnNextAsync()

in Amazon.KinesisTap.AWS/KinesisFirehoseSink.cs [281:366]


        protected override async Task OnNextAsync(List<Envelope<Record>> envelopes, long batchBytes)
        {
            _logger?.LogDebug($"KinesisFirehoseSink {Id} sending {envelopes.Count} records {batchBytes} bytes.");

            DateTime utcNow = DateTime.UtcNow;
            _clientLatency = (long)envelopes.Average(r => (utcNow - r.Timestamp).TotalMilliseconds);

            long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
            try
            {
                _recordsAttempted += envelopes.Count;
                _bytesAttempted += batchBytes;
                List<Record> records = envelopes.Select(r => r.Data).ToList();
                if (CanCombineRecords)
                {
                    records = CombineRecords(records);
                }

                PutRecordBatchResponse response = await SendRequestAsync(new PutRecordBatchRequest()
                {
                    DeliveryStreamName = _deliveryStreamName,
                    Records = records
                });
                _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                if (response.FailedPutCount > 0 && response.RequestResponses != null)
                {
                    _throttle.SetError();
                    _recoverableServiceErrors++;
                    _recordsSuccess += envelopes.Count - response.FailedPutCount;
                    _logger?.LogError($"KinesisFirehoseSink client {Id} BatchRecordCount={envelopes.Count} FailedPutCount={response.FailedPutCount} Attempt={_throttle.ConsecutiveErrorCount}");
                    List<Envelope<Record>> requeueRecords = new List<Envelope<Record>>();
                    for (int i = 0; i < response.RequestResponses.Count; i++)
                    {
                        var reqResponse = response.RequestResponses[i];
                        if (!string.IsNullOrEmpty(reqResponse.ErrorCode))
                        {
                            requeueRecords.Add(envelopes[i]);
                            //When there is error, reqResponse.RecordId would be null. So we have to use the sequence number within the batch here.
                            if (_throttle.ConsecutiveErrorCount >= _maxAttempts)
                            {
                                _logger?.LogDebug($"Record {i} error {reqResponse.ErrorCode}: {reqResponse.ErrorMessage}");
                            }
                        }
                    }
                    if (_buffer.Requeue(requeueRecords, _throttle.ConsecutiveErrorCount < _maxAttempts))
                    {
                        _recordsFailedRecoverable += response.FailedPutCount;
                    }
                    else
                    {
                        _recordsFailedNonrecoverable += response.FailedPutCount;
                        throw new AmazonKinesisFirehoseException($"Messages discarded after {_throttle.ConsecutiveErrorCount} attempts.");
                    }
                }
                else
                {
                    _throttle.SetSuccess();
                    _recordsSuccess += envelopes.Count;
                    _logger?.LogDebug($"KinesisFirehoseSink {Id} successfully sent {envelopes.Count} records {batchBytes} bytes.");

                    await SaveBookmarks(envelopes);
                }
            }
            catch (Exception ex)
            {
                _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                _throttle.SetError();
                if (IsRecoverableException(ex)
                    && _buffer.Requeue(envelopes, _throttle.ConsecutiveErrorCount < _maxAttempts))
                {
                    _recoverableServiceErrors++;
                    _recordsFailedRecoverable += envelopes.Count;
                    if (LogThrottler.ShouldWrite(LogThrottler.CreateLogTypeId(GetType().FullName, "OnNextAsync", "Requeued", Id), TimeSpan.FromMinutes(5)))
                    {
                        _logger?.LogWarning($"KinesisFirehoseSink {Id} requeued request after exception (attempt {_throttle.ConsecutiveErrorCount}): {ex.ToMinimized()}");
                    }
                }
                else
                {
                    _nonrecoverableServiceErrors++;
                    _recordsFailedNonrecoverable += envelopes.Count;
                    _logger?.LogError($"KinesisFirehoseSink {Id} client exception after {_throttle.ConsecutiveErrorCount} attempts: {ex.ToMinimized()}");
                }
            }
            PublishMetrics(MetricsConstants.KINESIS_FIREHOSE_PREFIX);
        }