in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [183:259]
public void start(Map<String, String> map) {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter START");
startTimestamp = System.currentTimeMillis();
config = new MaxComputeSinkConnectorConfig(map);
accountType =
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.ACCOUNT_TYPE.getName());
timeout =
config.getLong(MaxComputeSinkConnectorConfig.BaseParameter.CLIENT_TIMEOUT_MS.getName());
bufferSizeKB =
config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.BUFFER_SIZE_KB.getName());
retryTimes =
config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.FAIL_RETRY_TIMES.getName());
String
endpoint =
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_ENDPOINT.getName());
project =
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_PROJECT.getName());
table =
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_TABLE.getName());
tunnelEndpoint =
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.TUNNEL_ENDPOINT.getName());
batchSize =
config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.RECORD_BATCH_SIZE.getName());
Integer
poolSize =
config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.POOL_SIZE.getName());
skipErrorRecords =
config.getBoolean(MaxComputeSinkConnectorConfig.BaseParameter.SKIP_ERROR.getName());
if (poolSize > 1) {
executor = Executors.newFixedThreadPool(poolSize); // multi-thread to run record sink to MC
multiWriteMode = true; // use new mode;
}
// Init odps
odps = OdpsUtils.getOdps(config);
odpsCreateLastTime = System.currentTimeMillis();
odps.setEndpoint(endpoint);
// Init converter builder
RecordConverterBuilder.Format format = RecordConverterBuilder.Format.valueOf(
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.FORMAT.getName()));
RecordConverterBuilder.Mode mode = RecordConverterBuilder.Mode.valueOf(
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MODE.getName()));
converterBuilder = new RecordConverterBuilder();
converterBuilder.format(format).mode(mode);
converterBuilder.schema(odps.tables().get(table).getSchema());
// Parse partition window size
partitionWindowType = PartitionWindowType.valueOf(
config.getString(
MaxComputeSinkConnectorConfig.BaseParameter.PARTITION_WINDOW_TYPE.getName()));
// Parse time zone
tz =
TimeZone.getTimeZone(
config.getString(MaxComputeSinkConnectorConfig.BaseParameter.TIME_ZONE.getName()));
useStreamTunnel =
config.getBoolean(MaxComputeSinkConnectorConfig.BaseParameter.USE_STREAM_TUNNEL.getName());
if (useStreamTunnel) {
LOGGER.info("MAXCOMPUTE STREAMING TUNNEL ENABLED.");
}
if (!StringUtils.isNullOrEmpty(
config.getString(
MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_NAME.getName()))
&& !StringUtils.isNullOrEmpty(
config.getString(
MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_BOOTSTRAP_SERVERS.getName()))) {
runtimeErrorWriter = new KafkaWriter(config);
LOGGER.info(
"Thread(" + Thread.currentThread().getId() + ") new runtime error kafka writer done");
}
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Start MaxCompute sink task done");
}