in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java [62:147]
private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
org.apache.flink.configuration.Configuration sinkConfig =
new org.apache.flink.configuration.Configuration();
// required sink configurations
sinkConfig.set(
StarRocksSinkOptions.JDBC_URL, cdcConfig.get(StarRocksDataSinkOptions.JDBC_URL));
sinkConfig.set(
StarRocksSinkOptions.LOAD_URL, cdcConfig.get(StarRocksDataSinkOptions.LOAD_URL));
sinkConfig.set(
StarRocksSinkOptions.USERNAME, cdcConfig.get(StarRocksDataSinkOptions.USERNAME));
sinkConfig.set(
StarRocksSinkOptions.PASSWORD, cdcConfig.get(StarRocksDataSinkOptions.PASSWORD));
// optional sink configurations
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_LABEL_PREFIX)
.ifPresent(
config -> sinkConfig.set(StarRocksSinkOptions.SINK_LABEL_PREFIX, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT)
.ifPresent(
config ->
sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT)
.ifPresent(
config -> sinkConfig.set(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT)
.ifPresent(
config ->
sinkConfig.set(
StarRocksSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT,
config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE)
.ifPresent(
config -> sinkConfig.set(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL)
.ifPresent(
config ->
sinkConfig.set(
StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_SCAN_FREQUENCY)
.ifPresent(
config -> sinkConfig.set(StarRocksSinkOptions.SINK_SCAN_FREQUENCY, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_IO_THREAD_COUNT)
.ifPresent(
config ->
sinkConfig.set(StarRocksSinkOptions.SINK_IO_THREAD_COUNT, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD)
.ifPresent(
config ->
sinkConfig.set(
StarRocksSinkOptions
.SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD,
config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE)
.ifPresent(
config ->
sinkConfig.set(
StarRocksSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE,
config));
// specified sink configurations for cdc scenario
sinkConfig.set(StarRocksSinkOptions.DATABASE_NAME, "*");
sinkConfig.set(StarRocksSinkOptions.TABLE_NAME, "*");
sinkConfig.set(StarRocksSinkOptions.SINK_USE_NEW_SINK_API, true);
// currently cdc framework only supports at-least-once
sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once");
Map<String, String> streamProperties =
getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX);
// force to use json format for stream load to simplify the configuration,
// such as there is no need to reconfigure the "columns" property after
// schema change. csv format can be supported in the future if needed
streamProperties.put("sink.properties.format", "json");
streamProperties.put("sink.properties.strip_outer_array", "true");
streamProperties.put("sink.properties.ignore_json_size", "true");
return new StarRocksSinkOptions(sinkConfig, streamProperties);
}