in src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java [161:227]
private boolean validateOutStandingRecords() {
if (pauseConsumption) {
if (singleKinesisProducerPerPartition) {
producerMap.values().forEach(producer -> {
int sleepCount = 0;
boolean pause = false;
// Validate if producer has outstanding records within
// threshold values
// and if not pause further consumption
while (producer.getOutstandingRecordsCount() > outstandingRecordsThreshold) {
try {
// Pausing further
sinkTaskContext.pause((TopicPartition[]) sinkTaskContext.assignment().toArray());
pause = true;
Thread.sleep(sleepPeriod);
if (sleepCount++ > sleepCycles) {
// Dummy message - Replace with your code to
// notify/log that Kinesis Producers have
// buffered values
// but are not being sent
System.out.println(
"Kafka Consumption has been stopped because Kinesis Producers has buffered messages above threshold");
sleepCount = 0;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (pause)
sinkTaskContext.resume((TopicPartition[]) sinkTaskContext.assignment().toArray());
});
return true;
} else {
int sleepCount = 0;
boolean pause = false;
// Validate if producer has outstanding records within threshold
// values
// and if not pause further consumption
while (kinesisProducer.getOutstandingRecordsCount() > outstandingRecordsThreshold) {
try {
// Pausing further
sinkTaskContext.pause((TopicPartition[]) sinkTaskContext.assignment().toArray());
pause = true;
Thread.sleep(sleepPeriod);
if (sleepCount++ > sleepCycles) {
// Dummy message - Replace with your code to
// notify/log that Kinesis Producers have buffered
// values
// but are not being sent
System.out.println(
"Kafka Consumption has been stopped because Kinesis Producers has buffered messages above threshold");
sleepCount = 0;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (pause)
sinkTaskContext.resume((TopicPartition[]) sinkTaskContext.assignment().toArray());
return true;
}
} else {
return true;
}
}