public BatchEventSink()

in Amazon.KinesisTap.Core/Sinks/BatchEventSink.cs [52:109]


        public BatchEventSink(
            IPlugInContext context,
            int defaultInterval,
            int defaultRecordCount,
            long maxBatchSize
        ) : base(context)
        {
            int.TryParse(_config[ConfigConstants.BUFFER_INTERVAL], out _interval);
            if (_interval == 0) _interval = defaultInterval;
            int.TryParse(_config[ConfigConstants.BUFFER_SIZE], out _count);
            if (_count == 0) _count = defaultRecordCount;
            _maxBatchSize = maxBatchSize;

            string queueType = _config[ConfigConstants.QUEUE_TYPE];
            int.TryParse(_config[ConfigConstants.QUEUE_MAX_BATCHES], out int maxBatches);
            ISimpleQueue<List<Envelope<TRecord>>> lowerPriorityQueue;
            if (!string.IsNullOrWhiteSpace(queueType) && queueType.Equals(ConfigConstants.QUEUE_TYPE_FILE, StringComparison.OrdinalIgnoreCase))
            {
                if (maxBatches == 0) maxBatches = 10000;
                string queuePath = _config[ConfigConstants.QUEUE_PATH];
                if (string.IsNullOrWhiteSpace(queuePath))
                    queuePath = Path.Combine(Utility.GetSessionQueuesDirectoryRelativePath(_context.SessionName), Id);
                lowerPriorityQueue = new FilePersistentQueue<List<Envelope<TRecord>>>(
                    maxBatches, queuePath, GetSerializer(), context.Services.GetService<IAppDataFileProvider>(), _logger);
            }
            else //in memory
            {
                if (maxBatches == 0) maxBatches = 100;
                lowerPriorityQueue = new InMemoryQueue<List<Envelope<TRecord>>>(maxBatches);
            }

            if (!int.TryParse(_config["MaxInMemoryCacheSize"], out int inMemoryCacheSize) || inMemoryCacheSize < 1)
                inMemoryCacheSize = 10;
            if (inMemoryCacheSize > 100)
                throw new ConfigurationException("In-memory cache size cannot exceed 100 batches (500MB).");

            bool.TryParse(_config["PersistWhenCacheFull"], out bool persistWhenCacheFull);
            if (persistWhenCacheFull)
            {
                _logger?.LogDebug("Creating BatchEventSink with HighCapacityBuffer and max in-memory cache size: {0}", inMemoryCacheSize);
                _buffer = new HighCapacityBuffer<List<Envelope<TRecord>>>(inMemoryCacheSize, _context.Logger, OnNextBatch, lowerPriorityQueue);
            }
            else
            {
                _logger?.LogDebug("Creating BatchEventSink with HiLowBuffer and max in-memory cache size: {0}", inMemoryCacheSize);
                _buffer = new HiLowBuffer<List<Envelope<TRecord>>>(inMemoryCacheSize, _context.Logger, OnNextBatch, lowerPriorityQueue);
            }

            _batch = new Batch<Envelope<TRecord>>(TimeSpan.FromSeconds(_interval),
                    new long[] { _count, _maxBatchSize },
                    new Func<Envelope<TRecord>, long>[]
                    {
                        r => 1,
                        GetRecordSize
                    },
                    SendBatch
                );
        }