in otsstreamreader/src/main/java/com/alibaba/datax/plugin/reader/otsstreamreader/internal/config/OTSStreamReaderConfig.java [240:352]
public static OTSStreamReaderConfig load(Configuration param) {
OTSStreamReaderConfig config = new OTSStreamReaderConfig();
config.setEndpoint(ParamChecker.checkStringAndGet(param, KEY_OTS_ENDPOINT, true));
config.setAccessId(ParamChecker.checkStringAndGet(param, KEY_OTS_ACCESSID, true));
config.setAccessKey(ParamChecker.checkStringAndGet(param, KEY_OTS_ACCESSKEY, true));
config.setInstanceName(ParamChecker.checkStringAndGet(param, KEY_OTS_INSTANCE_NAME, true));
config.setDataTable(ParamChecker.checkStringAndGet(param, KEY_DATA_TABLE_NAME, true));
config.setStatusTable(ParamChecker.checkStringAndGet(param, KEY_STATUS_TABLE_NAME, true));
config.setIsExportSequenceInfo(param.getBool(KEY_IS_EXPORT_SEQUENCE_INFO, false));
config.setEnableSeekIteratorByTimestamp(param.getBool(ENABLE_SEEK_SHARD_ITERATOR, false));
config.setConfSimplifyEnable(param.getBool(CONF_SIMPLIFY_ENABLE, DEFAULT_CONF_SIMPLIFY_ENABLE_VALUE));
config.setEnableTableGroupSupport(param.getBool(KEY_ENABLE_TABLE_GROUP_SUPPORT, false));
if (param.getInt(KEY_THREAD_NUM) != null) {
config.setThreadNum(param.getInt(KEY_THREAD_NUM));
}
if (param.getString(KEY_DATE) == null &&
(param.getLong(KEY_START_TIMESTAMP_MILLIS) == null || param.getLong(KEY_END_TIMESTAMP_MILLIS) == null) &&
(param.getLong(KEY_START_TIME_STRING) == null || param.getLong(KEY_END_TIME_STRING) == null)) {
throw new OTSStreamReaderException("Must set date or time range millis or time range string, please check your config.");
}
if (param.get(KEY_DATE) != null &&
(param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null) &&
(param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
throw new OTSStreamReaderException("Can't set date and time range millis and time range string, please check your config.");
}
if (param.get(KEY_DATE) != null &&
(param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null)) {
throw new OTSStreamReaderException("Can't set date and time range both, please check your config.");
}
if (param.get(KEY_DATE) != null &&
(param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
throw new OTSStreamReaderException("Can't set date and time range string both, please check your config.");
}
if ((param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null) &&
(param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
throw new OTSStreamReaderException("Can't set time range millis and time range string both, expect timestamp like '1516010400000'.");
}
if (param.getString(KEY_START_TIME_STRING) != null &&
param.getString(KEY_END_TIME_STRING) != null) {
String startTime = ParamChecker.checkStringAndGet(param, KEY_START_TIME_STRING, true);
String endTime = ParamChecker.checkStringAndGet(param, KEY_END_TIME_STRING, true);
try {
long startTimestampMillis = TimeUtils.parseTimeStringToTimestampMillis(startTime);
config.setStartTimestampMillis(startTimestampMillis);
} catch (Exception ex) {
throw new OTSStreamReaderException("Can't parse startTimeString: " + startTime + ", expect format date like '201801151612'.");
}
try {
long endTimestampMillis = TimeUtils.parseTimeStringToTimestampMillis(endTime);
config.setEndTimestampMillis(endTimestampMillis);
} catch (Exception ex) {
throw new OTSStreamReaderException("Can't parse endTimeString: " + endTime + ", expect format date like '201801151612'.");
}
} else if (param.getString(KEY_DATE) == null) {
config.setStartTimestampMillis(param.getLong(KEY_START_TIMESTAMP_MILLIS));
config.setEndTimestampMillis(param.getLong(KEY_END_TIMESTAMP_MILLIS));
} else {
String date = ParamChecker.checkStringAndGet(param, KEY_DATE, true);
try {
long startTimestampMillis = TimeUtils.parseDateToTimestampMillis(date);
config.setStartTimestampMillis(startTimestampMillis);
config.setEndTimestampMillis(startTimestampMillis + TimeUtils.DAY_IN_MILLIS);
} catch (ParseException ex) {
throw new OTSStreamReaderException("Can't parse date: " + date);
}
}
if (config.getStartTimestampMillis() >= config.getEndTimestampMillis()) {
throw new OTSStreamReaderException("EndTimestamp must be larger than startTimestamp.");
}
config.setMaxRetries(param.getInt(KEY_MAX_RETRIES, DEFAULT_MAX_RETRIES));
String mode = param.getString(KEY_MODE);
if (mode != null) {
if (mode.equalsIgnoreCase(Mode.SINGLE_VERSION_AND_UPDATE_ONLY.name())) {
config.setMode(Mode.SINGLE_VERSION_AND_UPDATE_ONLY);
parseConfigForSingleVersionAndUpdateOnlyMode(config, param);
} else {
throw new OTSStreamReaderException("Unsupported Mode: " + mode + ", please check your config.");
}
} else {
config.setMode(Mode.MULTI_VERSION);
List<Object> values = param.getList(KEY_COLUMN);
if (values != null) {
LOG.warn("The multi version mode doesn't support setting columns, column config will ignore.");
}
Boolean isTimeseriesTable = param.getBool(IS_TIMESERIES_TABLE);
if (isTimeseriesTable != null) {
LOG.warn("The multi version mode doesn't support setting Timeseries stream, stream config will ignore.");
}
}
LOG.info("endpoint: {}, accessKeyId: {}, accessKeySecret: {}, instanceName: {}, dataTableName: {}, statusTableName: {}," +
" isExportSequenceInfo: {}, startTimestampMillis: {}, endTimestampMillis:{}, maxRetries:{}, enableSeekIteratorByTimestamp: {}, "
+ "confSimplifyEnable: {}, isTimeseriesTable: {}.", config.getEndpoint(),
config.getAccessId(), config.getAccessKey(), config.getInstanceName(), config.getDataTable(),
config.getStatusTable(), config.isExportSequenceInfo(), config.getStartTimestampMillis(),
config.getEndTimestampMillis(), config.getMaxRetries(), config.getEnableSeekIteratorByTimestamp(),
config.isConfSimplifyEnable(), config.isTimeseriesTable());
return config;
}