in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkConnectorConfig.java [88:211]
public static ConfigDef conf() {
ConfigDef configDef = new ConfigDef();
configDef
.define(BaseParameter.POOL_SIZE.getName(),
Type.INT,
Runtime.getRuntime().availableProcessors(),
Importance.MEDIUM,
"MaxCompute sink pool size")
.define(BaseParameter.RECORD_BATCH_SIZE.getName(),
Type.INT,
8000,
Importance.MEDIUM, "max record size for single writer-thread")
.define(BaseParameter.MAXCOMPUTE_ENDPOINT.getName(),
Type.STRING,
Importance.HIGH,
"MaxCompute endpoint")
.define(BaseParameter.MAXCOMPUTE_PROJECT.getName(),
Type.STRING,
Importance.HIGH,
"MaxCompute project")
.define(BaseParameter.MAXCOMPUTE_SCHEMA.getName(),
Type.STRING,
"",
Importance.MEDIUM,
"MaxCompute schema")
.define(BaseParameter.MAXCOMPUTE_TABLE.getName(),
Type.STRING,
Importance.HIGH,
"MaxCompute table")
.define(BaseParameter.TUNNEL_ENDPOINT.getName(),
Type.STRING,
"",
Importance.MEDIUM,
"Tunnel endpoint")
.define(BaseParameter.ACCESS_ID.getName(),
Type.STRING,
Importance.HIGH,
"Aliyun access ID")
.define(BaseParameter.ACCESS_KEY.getName(),
Type.STRING,
Importance.HIGH,
"Aliyun access key")
.define(BaseParameter.ACCOUNT_ID.getName(),
Type.STRING,
"",
Importance.HIGH,
"Account id for STS")
.define(BaseParameter.REGION_ID.getName(),
Type.STRING,
"",
Importance.HIGH,
"Region id for STS")
.define(BaseParameter.STS_ENDPOINT.getName(),
ConfigDef.Type.STRING,
BaseParameter.DEFAULT_STS_ENDPOINT.getName(),
ConfigDef.Importance.HIGH,
"Sts endpoint")
.define(BaseParameter.ROLE_NAME.getName(),
Type.STRING,
"",
Importance.HIGH,
"Role name for STS")
.define(BaseParameter.ACCOUNT_TYPE.getName(),
Type.STRING,
Account.AccountProvider.ALIYUN.toString(),
Importance.HIGH,
"Account type: STS Authorization (STS) or Primary Aliyun Account (ALIYUN)")
.define(BaseParameter.CLIENT_TIMEOUT_MS.getName(),
Type.LONG,
DEFAULT_CLIENT_TIME_OUT_MS,
Importance.MEDIUM,
"STS token time out")
.define(BaseParameter.RUNTIME_ERROR_TOPIC_BOOTSTRAP_SERVERS.getName(),
Type.STRING,
"",
Importance.MEDIUM,
"Bootstrap servers")
.define(BaseParameter.RUNTIME_ERROR_TOPIC_NAME.getName(),
Type.STRING,
"",
Importance.MEDIUM,
"Error topic name")
.define(BaseParameter.FORMAT.getName(),
Type.STRING,
"TEXT",
Importance.HIGH,
"Input format, could be TEXT or CSV")
.define(BaseParameter.MODE.getName(),
Type.STRING,
"DEFAULT",
Importance.HIGH,
"Mode, could be default, key, or value")
.define(BaseParameter.PARTITION_WINDOW_TYPE.getName(),
Type.STRING,
"HOUR",
Importance.HIGH,
"Partition window type, could be DAY, HOUR")
.define(BaseParameter.TIME_ZONE.getName(),
Type.STRING,
TimeZone.getDefault().getID(),
Importance.HIGH,
"Timezone")
.define(BaseParameter.USE_STREAM_TUNNEL.getName(),
Type.BOOLEAN,
false,
Importance.LOW,
"use streaming tunnel instead of batch tunnel")
.define(BaseParameter.BUFFER_SIZE_KB.getName(),
Type.INT,
64 * 1024,
Importance.MEDIUM,
"internal buffer size per odps partition in KB, default 64MB")
.define(BaseParameter.FAIL_RETRY_TIMES.getName(),
Type.INT,
3,
Importance.MEDIUM,
"retry times on flush failure. default 3 times. if invalid value provided, will fallback to default value.")
.define(BaseParameter.SKIP_ERROR.getName(),
Type.BOOLEAN,
false,
Importance.LOW,
"the task policy when internal errors happen, SKIP or EXIT");
return configDef;
}