public DynamicTableSink createDynamicTableSink()

in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java [92:140]


    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validate();
        Map<String, String> rawProperties = context.getCatalogTable().getOptions();
        Configuration properties = Configuration.fromMap(rawProperties);
        String topicName = properties.getString(TOPIC);
        String producerGroup = properties.getString(PRODUCER_GROUP);
        String nameServerAddress = properties.getString(ENDPOINTS);
        String tag = properties.getString(TAG);
        String accessKey = properties.getString(OPTIONAL_ACCESS_KEY);
        String secretKey = properties.getString(OPTIONAL_SECRET_KEY);
        String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
        String encoding = properties.getString(OPTIONAL_ENCODING);
        String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
        int retryTimes = properties.getInteger(OPTIONAL_WRITE_RETRY_TIMES);
        long sleepTimeMs = properties.getLong(OPTIONAL_WRITE_SLEEP_TIME_MS);
        boolean isDynamicTag = properties.getBoolean(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
        boolean isDynamicTagIncluded =
                properties.getBoolean(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
        boolean writeKeysToBody = properties.getBoolean(OPTIONAL_WRITE_KEYS_TO_BODY);
        String keyColumnsConfig = properties.getString(OPTIONAL_WRITE_KEY_COLUMNS);
        String[] keyColumns = new String[0];
        if (keyColumnsConfig != null && keyColumnsConfig.length() > 0) {
            keyColumns = keyColumnsConfig.split(",");
        }
        DescriptorProperties descriptorProperties = new DescriptorProperties();
        descriptorProperties.putProperties(rawProperties);
        TableSchema physicalSchema =
                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

        return new RocketMQDynamicTableSink(
                descriptorProperties,
                physicalSchema,
                topicName,
                producerGroup,
                nameServerAddress,
                accessKey,
                secretKey,
                tag,
                dynamicColumn,
                fieldDelimiter,
                encoding,
                retryTimes,
                sleepTimeMs,
                isDynamicTag,
                isDynamicTagIncluded,
                writeKeysToBody,
                keyColumns);
    }