in Amazon.KinesisTap.AWS/KinesisFirehoseSink.cs [281:366]
protected override async Task OnNextAsync(List<Envelope<Record>> envelopes, long batchBytes)
{
_logger?.LogDebug($"KinesisFirehoseSink {Id} sending {envelopes.Count} records {batchBytes} bytes.");
DateTime utcNow = DateTime.UtcNow;
_clientLatency = (long)envelopes.Average(r => (utcNow - r.Timestamp).TotalMilliseconds);
long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
try
{
_recordsAttempted += envelopes.Count;
_bytesAttempted += batchBytes;
List<Record> records = envelopes.Select(r => r.Data).ToList();
if (CanCombineRecords)
{
records = CombineRecords(records);
}
PutRecordBatchResponse response = await SendRequestAsync(new PutRecordBatchRequest()
{
DeliveryStreamName = _deliveryStreamName,
Records = records
});
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
if (response.FailedPutCount > 0 && response.RequestResponses != null)
{
_throttle.SetError();
_recoverableServiceErrors++;
_recordsSuccess += envelopes.Count - response.FailedPutCount;
_logger?.LogError($"KinesisFirehoseSink client {Id} BatchRecordCount={envelopes.Count} FailedPutCount={response.FailedPutCount} Attempt={_throttle.ConsecutiveErrorCount}");
List<Envelope<Record>> requeueRecords = new List<Envelope<Record>>();
for (int i = 0; i < response.RequestResponses.Count; i++)
{
var reqResponse = response.RequestResponses[i];
if (!string.IsNullOrEmpty(reqResponse.ErrorCode))
{
requeueRecords.Add(envelopes[i]);
//When there is error, reqResponse.RecordId would be null. So we have to use the sequence number within the batch here.
if (_throttle.ConsecutiveErrorCount >= _maxAttempts)
{
_logger?.LogDebug($"Record {i} error {reqResponse.ErrorCode}: {reqResponse.ErrorMessage}");
}
}
}
if (_buffer.Requeue(requeueRecords, _throttle.ConsecutiveErrorCount < _maxAttempts))
{
_recordsFailedRecoverable += response.FailedPutCount;
}
else
{
_recordsFailedNonrecoverable += response.FailedPutCount;
throw new AmazonKinesisFirehoseException($"Messages discarded after {_throttle.ConsecutiveErrorCount} attempts.");
}
}
else
{
_throttle.SetSuccess();
_recordsSuccess += envelopes.Count;
_logger?.LogDebug($"KinesisFirehoseSink {Id} successfully sent {envelopes.Count} records {batchBytes} bytes.");
await SaveBookmarks(envelopes);
}
}
catch (Exception ex)
{
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetError();
if (IsRecoverableException(ex)
&& _buffer.Requeue(envelopes, _throttle.ConsecutiveErrorCount < _maxAttempts))
{
_recoverableServiceErrors++;
_recordsFailedRecoverable += envelopes.Count;
if (LogThrottler.ShouldWrite(LogThrottler.CreateLogTypeId(GetType().FullName, "OnNextAsync", "Requeued", Id), TimeSpan.FromMinutes(5)))
{
_logger?.LogWarning($"KinesisFirehoseSink {Id} requeued request after exception (attempt {_throttle.ConsecutiveErrorCount}): {ex.ToMinimized()}");
}
}
else
{
_nonrecoverableServiceErrors++;
_recordsFailedNonrecoverable += envelopes.Count;
_logger?.LogError($"KinesisFirehoseSink {Id} client exception after {_throttle.ConsecutiveErrorCount} attempts: {ex.ToMinimized()}");
}
}
PublishMetrics(MetricsConstants.KINESIS_FIREHOSE_PREFIX);
}