public FlinkKinesisConsumer()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [213:247]


    public FlinkKinesisConsumer(
            List<String> streams,
            KinesisDeserializationSchema<T> deserializer,
            Properties configProps) {
        checkNotNull(streams, "streams can not be null");
        checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
        checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
        this.streams = streams;

        this.configProps = checkNotNull(configProps, "configProps can not be null");

        // check the configuration properties for any conflicting settings
        KinesisConfigUtil.validateConsumerConfiguration(this.configProps, streams);

        checkNotNull(deserializer, "deserializer can not be null");
        checkArgument(
                InstantiationUtil.isSerializable(deserializer),
                "The provided deserialization schema is not serializable: "
                        + deserializer.getClass().getName()
                        + ". "
                        + "Please check that it does not contain references to non-serializable instances.");
        this.deserializer = deserializer;

        StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(configProps, streams);

        if (LOG.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            for (String stream : streams) {
                sb.append(stream).append(", ");
            }
            LOG.info(
                    "Flink Kinesis Consumer is going to read the following streams: {}",
                    sb.toString());
        }
    }