public DynamicTableSource createDynamicTableSource()

in src/main/java/org/apache/flink/connector/rocketmq/source/table/RocketMQDynamicTableSourceFactory.java [87:182]


    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validate();
        Map<String, String> rawProperties = context.getCatalogTable().getOptions();
        Configuration configuration = Configuration.fromMap(rawProperties);
        String topic = configuration.getString(RocketMQSourceOptions.TOPIC);
        String consumerGroup = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
        String nameServerAddress = configuration.getString(RocketMQSourceOptions.ENDPOINTS);
        String tag = configuration.getString(RocketMQSourceOptions.OPTIONAL_TAG);
        String sql = configuration.getString(RocketMQSourceOptions.OPTIONAL_SQL);
        if (configuration.contains(RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE)
                && (configuration.contains(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP)
                        || configuration.contains(
                                RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_DATE))) {
            throw new IllegalArgumentException(
                    String.format(
                            "cannot support these configs when %s has been set: [%s] !",
                            RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE.key(),
                            RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_SPECIFIC.key()));
        }
        long startMessageOffset =
                configuration.getLong(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_SPECIFIC);
        long startTimeMs =
                configuration.getLong(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP);
        String startDateTime =
                configuration.getString(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_DATE);
        String timeZone = configuration.getString(RocketMQSourceOptions.OPTIONAL_TIME_ZONE);
        String accessKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_ACCESS_KEY);
        String secretKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_SECRET_KEY);
        long startTime = startTimeMs;
        if (startTime == -1) {
            if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
                try {
                    startTime = parseDateString(startDateTime, timeZone);
                } catch (ParseException e) {
                    throw new RuntimeException(
                            String.format(
                                    "Incorrect datetime format: %s, pls use ISO-8601 "
                                            + "complete date plus hours, minutes and seconds format:%s.",
                                    startDateTime, DATE_FORMAT),
                            e);
                }
            }
        }
        long stopInMs = Long.MAX_VALUE;
        String endDateTime =
                configuration.getString(RocketMQSourceOptions.OPTIONAL_STOP_OFFSET_TIMESTAMP);
        if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) {
            try {
                stopInMs = parseDateString(endDateTime, timeZone);
            } catch (ParseException e) {
                throw new RuntimeException(
                        String.format(
                                "Incorrect datetime format: %s, pls use ISO-8601 "
                                        + "complete date plus hours, minutes and seconds format:%s.",
                                endDateTime, DATE_FORMAT),
                        e);
            }
            Preconditions.checkArgument(
                    stopInMs >= startTime, "Start time should be less than stop time.");
        }
        long partitionDiscoveryIntervalMs =
                configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
        boolean useNewApi = configuration.getBoolean(RocketMQSourceOptions.OPTIONAL_USE_NEW_API);
        DescriptorProperties descriptorProperties = new DescriptorProperties();
        descriptorProperties.putProperties(rawProperties);
        TableSchema physicalSchema =
                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
        descriptorProperties.putTableSchema("schema", physicalSchema);
        String consumerOffsetMode =
                configuration.getString(
                        RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE,
                        RocketMQConfig.CONSUMER_OFFSET_LATEST);
        long consumerOffsetTimestamp =
                configuration.getLong(
                        RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP,
                        System.currentTimeMillis());
        return new RocketMQScanTableSource(
                configuration.getLong(RocketMQSourceOptions.PULL_TIMEOUT_LONG_POLLING_SUSPEND),
                descriptorProperties,
                physicalSchema,
                topic,
                consumerGroup,
                nameServerAddress,
                accessKey,
                secretKey,
                tag,
                sql,
                stopInMs,
                startMessageOffset,
                startMessageOffset < 0 ? startTime : -1L,
                partitionDiscoveryIntervalMs,
                consumerOffsetMode,
                consumerOffsetTimestamp,
                useNewApi);
    }