public void registerStreamConsumer()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java [66:109]


    public void registerStreamConsumer() {
        if (sourceConfig.get(READER_TYPE) != EFO) {
            return;
        }

        String streamConsumerName = sourceConfig.get(EFO_CONSUMER_NAME);
        Preconditions.checkNotNull(
                streamConsumerName, "For EFO reader type, EFO consumer name must be specified.");
        Preconditions.checkArgument(
                !streamConsumerName.isEmpty(),
                "For EFO reader type, EFO consumer name cannot be empty.");

        switch (sourceConfig.get(EFO_CONSUMER_LIFECYCLE)) {
            case JOB_MANAGED:
                try {
                    LOG.info("Registering stream consumer - {}::{}", streamArn, streamConsumerName);
                    RegisterStreamConsumerResponse response =
                            kinesisStreamProxy.registerStreamConsumer(
                                    streamArn, streamConsumerName);
                    consumerArn = response.consumer().consumerARN();
                    LOG.info(
                            "Registered stream consumer - {}::{}",
                            streamArn,
                            response.consumer().consumerARN());
                } catch (ResourceInUseException e) {
                    LOG.warn(
                            "Found existing consumer {} on stream {}. Proceeding to read from consumer.",
                            streamConsumerName,
                            streamArn,
                            e);
                }
                break;
            case SELF_MANAGED:
                // This helps the job to fail fast if the EFO consumer requested does not exist.
                DescribeStreamConsumerResponse response =
                        kinesisStreamProxy.describeStreamConsumer(streamArn, streamConsumerName);
                LOG.info("Discovered stream consumer - {}", response);
                break;
            default:
                throw new IllegalArgumentException(
                        "Unsupported EFO consumer lifecycle: "
                                + sourceConfig.get(EFO_CONSUMER_LIFECYCLE));
        }
    }