in Amazon.KinesisTap.AWS/S3Sink.cs [114:205]
public async Task SendBatchAsync(List<Envelope<string>> records)
{
try
{
_logger?.LogDebug("S3Sink id {0} sending {1} logs", Id, records.Count);
// Create the PutObject request to send to S3
var request = new PutObjectRequest
{
BucketName = _bucketName,
Key = await GenerateS3ObjectKey(_filePath, _fileName)
};
// Turn the list of records into a stream that can be consumed by S3 and will register as a zipped file
var memStream = new MemoryStream();
using (var archive = new ZipArchive(memStream, ZipArchiveMode.Create, true))
{
var logFile = archive.CreateEntry("AlarmLogs.txt", CompressionLevel.Optimal);
using (var entryStream = logFile.Open())
using (var compressedStream = new MemoryStream())
{
foreach (Envelope<string> record in records)
{
var byteArray = Encoding.UTF8.GetBytes(record.Data + "\n");
compressedStream.Write(byteArray, 0, byteArray.Length);
}
compressedStream.Seek(0, SeekOrigin.Begin);
compressedStream.CopyTo(entryStream);
}
}
var batchBytes = memStream.Length;
request.InputStream = memStream;
// Send the file to S3
while (true)
{
var utcNow = DateTime.UtcNow;
long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
try
{
var response = await S3Client.PutObjectAsync(request);
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetSuccess();
_recordsAttempted += records.Count;
_bytesAttempted += batchBytes;
_logger?.LogDebug("S3Sink id {0} successfully sent {1} logs, compressed to {2} bytes",
Id, records.Count, batchBytes);
_recordsSuccess += records.Count;
await SaveBookmarks(records);
break;
}
catch (AmazonS3Exception ex)
{
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetError();
// Retry PutObjectRequest if possibe
if (ex.Retryable != null)
{
if (_buffer.Requeue(records, _throttle.ConsecutiveErrorCount < _maxAttempts))
{
_logger?.LogWarning("S3Sink id {0} attempt={1} exception={2}. Will retry.", Id, _throttle.ConsecutiveErrorCount, ex.Message);
_recoverableServiceErrors++;
_recordsFailedRecoverable += records.Count;
break;
}
}
_recordsFailedNonrecoverable += records.Count;
_nonrecoverableServiceErrors++;
throw;
}
catch (Exception)
{
_latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
_throttle.SetError();
_recordsFailedNonrecoverable += records.Count;
_nonrecoverableServiceErrors++;
throw;
}
}
}
catch (Exception ex)
{
_logger?.LogError("S3Sink id {0} exception (attempt={1}): {2}", Id, _throttle.ConsecutiveErrorCount, ex.ToMinimized());
}
PublishMetrics(MetricsConstants.S3_PREFIX);
}