in Amazon.KinesisTap.Core/Sinks/BatchEventSink.cs [52:109]
public BatchEventSink(
IPlugInContext context,
int defaultInterval,
int defaultRecordCount,
long maxBatchSize
) : base(context)
{
int.TryParse(_config[ConfigConstants.BUFFER_INTERVAL], out _interval);
if (_interval == 0) _interval = defaultInterval;
int.TryParse(_config[ConfigConstants.BUFFER_SIZE], out _count);
if (_count == 0) _count = defaultRecordCount;
_maxBatchSize = maxBatchSize;
string queueType = _config[ConfigConstants.QUEUE_TYPE];
int.TryParse(_config[ConfigConstants.QUEUE_MAX_BATCHES], out int maxBatches);
ISimpleQueue<List<Envelope<TRecord>>> lowerPriorityQueue;
if (!string.IsNullOrWhiteSpace(queueType) && queueType.Equals(ConfigConstants.QUEUE_TYPE_FILE, StringComparison.OrdinalIgnoreCase))
{
if (maxBatches == 0) maxBatches = 10000;
string queuePath = _config[ConfigConstants.QUEUE_PATH];
if (string.IsNullOrWhiteSpace(queuePath))
queuePath = Path.Combine(Utility.GetSessionQueuesDirectoryRelativePath(_context.SessionName), Id);
lowerPriorityQueue = new FilePersistentQueue<List<Envelope<TRecord>>>(
maxBatches, queuePath, GetSerializer(), context.Services.GetService<IAppDataFileProvider>(), _logger);
}
else //in memory
{
if (maxBatches == 0) maxBatches = 100;
lowerPriorityQueue = new InMemoryQueue<List<Envelope<TRecord>>>(maxBatches);
}
if (!int.TryParse(_config["MaxInMemoryCacheSize"], out int inMemoryCacheSize) || inMemoryCacheSize < 1)
inMemoryCacheSize = 10;
if (inMemoryCacheSize > 100)
throw new ConfigurationException("In-memory cache size cannot exceed 100 batches (500MB).");
bool.TryParse(_config["PersistWhenCacheFull"], out bool persistWhenCacheFull);
if (persistWhenCacheFull)
{
_logger?.LogDebug("Creating BatchEventSink with HighCapacityBuffer and max in-memory cache size: {0}", inMemoryCacheSize);
_buffer = new HighCapacityBuffer<List<Envelope<TRecord>>>(inMemoryCacheSize, _context.Logger, OnNextBatch, lowerPriorityQueue);
}
else
{
_logger?.LogDebug("Creating BatchEventSink with HiLowBuffer and max in-memory cache size: {0}", inMemoryCacheSize);
_buffer = new HiLowBuffer<List<Envelope<TRecord>>>(inMemoryCacheSize, _context.Logger, OnNextBatch, lowerPriorityQueue);
}
_batch = new Batch<Envelope<TRecord>>(TimeSpan.FromSeconds(_interval),
new long[] { _count, _maxBatchSize },
new Func<Envelope<TRecord>, long>[]
{
r => 1,
GetRecordSize
},
SendBatch
);
}