in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [213:247]
public FlinkKinesisConsumer(
List<String> streams,
KinesisDeserializationSchema<T> deserializer,
Properties configProps) {
checkNotNull(streams, "streams can not be null");
checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
this.streams = streams;
this.configProps = checkNotNull(configProps, "configProps can not be null");
// check the configuration properties for any conflicting settings
KinesisConfigUtil.validateConsumerConfiguration(this.configProps, streams);
checkNotNull(deserializer, "deserializer can not be null");
checkArgument(
InstantiationUtil.isSerializable(deserializer),
"The provided deserialization schema is not serializable: "
+ deserializer.getClass().getName()
+ ". "
+ "Please check that it does not contain references to non-serializable instances.");
this.deserializer = deserializer;
StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(configProps, streams);
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
for (String stream : streams) {
sb.append(stream).append(", ");
}
LOG.info(
"Flink Kinesis Consumer is going to read the following streams: {}",
sb.toString());
}
}