in src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java [74:148]
public DorisOptions(Map<String, String> config) {
this.name = config.get(DorisSinkConnectorConfig.NAME);
this.urls = config.get(DorisSinkConnectorConfig.DORIS_URLS);
this.queryPort = Integer.parseInt(config.get(DorisSinkConnectorConfig.DORIS_QUERY_PORT));
this.httpPort = Integer.parseInt(config.get(DorisSinkConnectorConfig.DORIS_HTTP_PORT));
this.user = config.get(DorisSinkConnectorConfig.DORIS_USER);
this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD);
this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE);
this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID));
this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE);
this.loadModel = LoadModel.of(config.get(DorisSinkConnectorConfig.LOAD_MODEL));
this.deliveryGuarantee =
DeliveryGuarantee.of(config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE));
this.converterMode = ConverterMode.of(config.get(DorisSinkConnectorConfig.CONVERTER_MODE));
this.schemaEvolutionMode =
SchemaEvolutionMode.of(
config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION));
this.fileSize = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
this.recordNum =
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS));
this.flushTime = Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
this.topicMap =
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
this.tableNameField = config.get(DorisSinkConnectorConfig.RECORD_TABLE_NAME_FIELD);
if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
if (Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
this.enable2PC = true;
this.force2PC = true;
} else {
this.enable2PC = false;
}
}
this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
this.enableDelete =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
this.requestConnectTimeoutMs =
DorisSinkConnectorConfig.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
this.requestReadTimeoutMs = DorisSinkConnectorConfig.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
this.labelPrefix = config.get(DorisSinkConnectorConfig.NAME);
if (config.containsKey(DorisSinkConnectorConfig.LABEL_PREFIX)) {
this.labelPrefix = config.get(DorisSinkConnectorConfig.LABEL_PREFIX);
}
if (config.containsKey(DorisSinkConnectorConfig.AUTO_REDIRECT)) {
this.autoRedirect =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.AUTO_REDIRECT));
}
if (config.containsKey(DorisSinkConnectorConfig.REQUEST_CONNECT_TIMEOUT_MS)) {
this.requestConnectTimeoutMs =
Integer.parseInt(
config.get(DorisSinkConnectorConfig.REQUEST_CONNECT_TIMEOUT_MS));
}
if (config.containsKey(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS)) {
this.requestReadTimeoutMs =
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
}
this.streamLoadProp = getStreamLoadPropFromConfig(config);
this.enableGroupCommit = ConfigCheckUtils.validateGroupCommitMode(this);
this.maxRetries =
Integer.parseInt(
config.getOrDefault(
DorisSinkConnectorConfig.MAX_RETRIES,
String.valueOf(DorisSinkConnectorConfig.MAX_RETRIES_DEFAULT)));
this.retryIntervalMs =
Integer.parseInt(
config.getOrDefault(
DorisSinkConnectorConfig.RETRY_INTERVAL_MS,
String.valueOf(
DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT)));
this.behaviorOnNullValues =
BehaviorOnNullValues.of(
config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES));
}