public async Task SendBatchAsync()

in Amazon.KinesisTap.AWS/S3Sink.cs [114:205]


        public async Task SendBatchAsync(List<Envelope<string>> records)
        {
            try
            {
                _logger?.LogDebug("S3Sink id {0} sending {1} logs", Id, records.Count);

                // Create the PutObject request to send to S3
                var request = new PutObjectRequest
                {
                    BucketName = _bucketName,
                    Key = await GenerateS3ObjectKey(_filePath, _fileName)
                };

                // Turn the list of records into a stream that can be consumed by S3 and will register as a zipped file
                var memStream = new MemoryStream();
                using (var archive = new ZipArchive(memStream, ZipArchiveMode.Create, true))
                {
                    var logFile = archive.CreateEntry("AlarmLogs.txt", CompressionLevel.Optimal);
                    using (var entryStream = logFile.Open())
                    using (var compressedStream = new MemoryStream())
                    {
                        foreach (Envelope<string> record in records)
                        {
                            var byteArray = Encoding.UTF8.GetBytes(record.Data + "\n");
                            compressedStream.Write(byteArray, 0, byteArray.Length);
                        }
                        compressedStream.Seek(0, SeekOrigin.Begin);
                        compressedStream.CopyTo(entryStream);
                    }
                }
                var batchBytes = memStream.Length;
                request.InputStream = memStream;

                // Send the file to S3
                while (true)
                {
                    var utcNow = DateTime.UtcNow;

                    long elapsedMilliseconds = Utility.GetElapsedMilliseconds();
                    try
                    {
                        var response = await S3Client.PutObjectAsync(request);
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetSuccess();
                        _recordsAttempted += records.Count;
                        _bytesAttempted += batchBytes;

                        _logger?.LogDebug("S3Sink id {0} successfully sent {1} logs, compressed to {2} bytes",
                            Id, records.Count, batchBytes);
                        _recordsSuccess += records.Count;
                        await SaveBookmarks(records);

                        break;
                    }
                    catch (AmazonS3Exception ex)
                    {
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetError();

                        // Retry PutObjectRequest if possibe
                        if (ex.Retryable != null)
                        {
                            if (_buffer.Requeue(records, _throttle.ConsecutiveErrorCount < _maxAttempts))
                            {
                                _logger?.LogWarning("S3Sink id {0} attempt={1} exception={2}. Will retry.", Id, _throttle.ConsecutiveErrorCount, ex.Message);
                                _recoverableServiceErrors++;
                                _recordsFailedRecoverable += records.Count;
                                break;
                            }
                        }

                        _recordsFailedNonrecoverable += records.Count;
                        _nonrecoverableServiceErrors++;
                        throw;
                    }
                    catch (Exception)
                    {
                        _latency = Utility.GetElapsedMilliseconds() - elapsedMilliseconds;
                        _throttle.SetError();
                        _recordsFailedNonrecoverable += records.Count;
                        _nonrecoverableServiceErrors++;
                        throw;
                    }
                }
            }
            catch (Exception ex)
            {
                _logger?.LogError("S3Sink id {0} exception (attempt={1}): {2}", Id, _throttle.ConsecutiveErrorCount, ex.ToMinimized());
            }

            PublishMetrics(MetricsConstants.S3_PREFIX);
        }