in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/StreamPopulator.java [61:79]
public StreamPopulator(String region, String bucketName, String objectPrefix, String streamName, boolean aggregate, float speedupFactor, long statisticsFrequencyMillies, String adaptTimeOption, boolean noWatermark) {
KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration()
.setRegion(region)
.setCredentialsRefreshDelay(500)
.setRecordTtl(300_000)
.setAggregationEnabled(aggregate);
final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withForceGlobalBucketAccessEnabled(true).build();
this.streamName = streamName;
this.speedupFactor = speedupFactor;
this.noWatermark = noWatermark;
this.adaptTimeOptionOption = AdaptTimeOption.valueOf(adaptTimeOption.toUpperCase());
this.statisticsFrequencyMillies = statisticsFrequencyMillies;
this.kinesisProducer = new KinesisProducer(producerConfiguration);
this.watermarkTracker = new WatermarkTracker(region, streamName);
this.backpressureSemaphore = new BackpressureSemaphore<>(MAX_OUTSTANDING_RECORD_COUNT);
this.taxiEventReader = new TaxiEventReader(s3, bucketName, objectPrefix);
}