in src/main/java/org/apache/flink/connector/rocketmq/source/table/RocketMQDynamicTableSourceFactory.java [87:182]
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
Map<String, String> rawProperties = context.getCatalogTable().getOptions();
Configuration configuration = Configuration.fromMap(rawProperties);
String topic = configuration.getString(RocketMQSourceOptions.TOPIC);
String consumerGroup = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
String nameServerAddress = configuration.getString(RocketMQSourceOptions.ENDPOINTS);
String tag = configuration.getString(RocketMQSourceOptions.OPTIONAL_TAG);
String sql = configuration.getString(RocketMQSourceOptions.OPTIONAL_SQL);
if (configuration.contains(RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE)
&& (configuration.contains(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP)
|| configuration.contains(
RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_DATE))) {
throw new IllegalArgumentException(
String.format(
"cannot support these configs when %s has been set: [%s] !",
RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE.key(),
RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_SPECIFIC.key()));
}
long startMessageOffset =
configuration.getLong(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_SPECIFIC);
long startTimeMs =
configuration.getLong(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP);
String startDateTime =
configuration.getString(RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_DATE);
String timeZone = configuration.getString(RocketMQSourceOptions.OPTIONAL_TIME_ZONE);
String accessKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_ACCESS_KEY);
String secretKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_SECRET_KEY);
long startTime = startTimeMs;
if (startTime == -1) {
if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
try {
startTime = parseDateString(startDateTime, timeZone);
} catch (ParseException e) {
throw new RuntimeException(
String.format(
"Incorrect datetime format: %s, pls use ISO-8601 "
+ "complete date plus hours, minutes and seconds format:%s.",
startDateTime, DATE_FORMAT),
e);
}
}
}
long stopInMs = Long.MAX_VALUE;
String endDateTime =
configuration.getString(RocketMQSourceOptions.OPTIONAL_STOP_OFFSET_TIMESTAMP);
if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) {
try {
stopInMs = parseDateString(endDateTime, timeZone);
} catch (ParseException e) {
throw new RuntimeException(
String.format(
"Incorrect datetime format: %s, pls use ISO-8601 "
+ "complete date plus hours, minutes and seconds format:%s.",
endDateTime, DATE_FORMAT),
e);
}
Preconditions.checkArgument(
stopInMs >= startTime, "Start time should be less than stop time.");
}
long partitionDiscoveryIntervalMs =
configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
boolean useNewApi = configuration.getBoolean(RocketMQSourceOptions.OPTIONAL_USE_NEW_API);
DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(rawProperties);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
descriptorProperties.putTableSchema("schema", physicalSchema);
String consumerOffsetMode =
configuration.getString(
RocketMQSourceOptions.OPTIONAL_STARTUP_SCAN_MODE,
RocketMQConfig.CONSUMER_OFFSET_LATEST);
long consumerOffsetTimestamp =
configuration.getLong(
RocketMQSourceOptions.OPTIONAL_STARTUP_OFFSET_TIMESTAMP,
System.currentTimeMillis());
return new RocketMQScanTableSource(
configuration.getLong(RocketMQSourceOptions.PULL_TIMEOUT_LONG_POLLING_SUSPEND),
descriptorProperties,
physicalSchema,
topic,
consumerGroup,
nameServerAddress,
accessKey,
secretKey,
tag,
sql,
stopInMs,
startMessageOffset,
startMessageOffset < 0 ? startTime : -1L,
partitionDiscoveryIntervalMs,
consumerOffsetMode,
consumerOffsetTimestamp,
useNewApi);
}