in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java [92:140]
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
Map<String, String> rawProperties = context.getCatalogTable().getOptions();
Configuration properties = Configuration.fromMap(rawProperties);
String topicName = properties.getString(TOPIC);
String producerGroup = properties.getString(PRODUCER_GROUP);
String nameServerAddress = properties.getString(ENDPOINTS);
String tag = properties.getString(TAG);
String accessKey = properties.getString(OPTIONAL_ACCESS_KEY);
String secretKey = properties.getString(OPTIONAL_SECRET_KEY);
String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
String encoding = properties.getString(OPTIONAL_ENCODING);
String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
int retryTimes = properties.getInteger(OPTIONAL_WRITE_RETRY_TIMES);
long sleepTimeMs = properties.getLong(OPTIONAL_WRITE_SLEEP_TIME_MS);
boolean isDynamicTag = properties.getBoolean(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
boolean isDynamicTagIncluded =
properties.getBoolean(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
boolean writeKeysToBody = properties.getBoolean(OPTIONAL_WRITE_KEYS_TO_BODY);
String keyColumnsConfig = properties.getString(OPTIONAL_WRITE_KEY_COLUMNS);
String[] keyColumns = new String[0];
if (keyColumnsConfig != null && keyColumnsConfig.length() > 0) {
keyColumns = keyColumnsConfig.split(",");
}
DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(rawProperties);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new RocketMQDynamicTableSink(
descriptorProperties,
physicalSchema,
topicName,
producerGroup,
nameServerAddress,
accessKey,
secretKey,
tag,
dynamicColumn,
fieldDelimiter,
encoding,
retryTimes,
sleepTimeMs,
isDynamicTag,
isDynamicTagIncluded,
writeKeysToBody,
keyColumns);
}