in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicSource.java [251:279]
private KinesisShardAssigner getShardAssigner(String shardAssignerIdentifier) {
ServiceLoader<KinesisDynamicShardAssignerFactory> loader =
ServiceLoader.load(KinesisDynamicShardAssignerFactory.class);
Iterator<KinesisDynamicShardAssignerFactory> factories = loader.iterator();
while (true) {
try {
if (!factories.hasNext()) {
break;
}
KinesisDynamicShardAssignerFactory factory = factories.next();
if (factory.shardAssignerIdentifer().equals(shardAssignerIdentifier)) {
return factory.getShardAssigner();
}
} catch (ServiceConfigurationError serviceConfigurationError) {
LOG.error(
"Error while attempting to iterate over shard assigner factories to "
+ "locate shard assigner with identifier: '{}'",
shardAssignerIdentifier,
serviceConfigurationError);
}
}
LOG.error(
"Unable to locate shard assigner factory for identifier: '{}'",
shardAssignerIdentifier);
return null;
}