public static OTSStreamReaderConfig load()

in otsstreamreader/src/main/java/com/alibaba/datax/plugin/reader/otsstreamreader/internal/config/OTSStreamReaderConfig.java [240:352]


    public static OTSStreamReaderConfig load(Configuration param) {
        OTSStreamReaderConfig config = new OTSStreamReaderConfig();

        config.setEndpoint(ParamChecker.checkStringAndGet(param, KEY_OTS_ENDPOINT, true));
        config.setAccessId(ParamChecker.checkStringAndGet(param, KEY_OTS_ACCESSID, true));
        config.setAccessKey(ParamChecker.checkStringAndGet(param, KEY_OTS_ACCESSKEY, true));
        config.setInstanceName(ParamChecker.checkStringAndGet(param, KEY_OTS_INSTANCE_NAME, true));
        config.setDataTable(ParamChecker.checkStringAndGet(param, KEY_DATA_TABLE_NAME, true));
        config.setStatusTable(ParamChecker.checkStringAndGet(param, KEY_STATUS_TABLE_NAME, true));
        config.setIsExportSequenceInfo(param.getBool(KEY_IS_EXPORT_SEQUENCE_INFO, false));
        config.setEnableSeekIteratorByTimestamp(param.getBool(ENABLE_SEEK_SHARD_ITERATOR, false));
        config.setConfSimplifyEnable(param.getBool(CONF_SIMPLIFY_ENABLE, DEFAULT_CONF_SIMPLIFY_ENABLE_VALUE));
        config.setEnableTableGroupSupport(param.getBool(KEY_ENABLE_TABLE_GROUP_SUPPORT, false));

        if (param.getInt(KEY_THREAD_NUM) != null) {
            config.setThreadNum(param.getInt(KEY_THREAD_NUM));
        }

        if (param.getString(KEY_DATE) == null &&
                (param.getLong(KEY_START_TIMESTAMP_MILLIS) == null || param.getLong(KEY_END_TIMESTAMP_MILLIS) == null) &&
                (param.getLong(KEY_START_TIME_STRING) == null || param.getLong(KEY_END_TIME_STRING) == null)) {
            throw new OTSStreamReaderException("Must set date or time range millis or time range string, please check your config.");
        }

        if (param.get(KEY_DATE) != null &&
                (param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null) &&
                (param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
            throw new OTSStreamReaderException("Can't set date and time range millis and time range string, please check your config.");
        }

        if (param.get(KEY_DATE) != null &&
                (param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null)) {
            throw new OTSStreamReaderException("Can't set date and time range both, please check your config.");
        }

        if (param.get(KEY_DATE) != null &&
                (param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
            throw new OTSStreamReaderException("Can't set date and time range string both, please check your config.");
        }

        if ((param.getLong(KEY_START_TIMESTAMP_MILLIS) != null || param.getLong(KEY_END_TIMESTAMP_MILLIS) != null) &&
                (param.getLong(KEY_START_TIME_STRING) != null || param.getLong(KEY_END_TIME_STRING) != null)) {
            throw new OTSStreamReaderException("Can't set time range millis and time range string both, expect timestamp like '1516010400000'.");
        }

        if (param.getString(KEY_START_TIME_STRING) != null &&
                param.getString(KEY_END_TIME_STRING) != null) {
            String startTime = ParamChecker.checkStringAndGet(param, KEY_START_TIME_STRING, true);
            String endTime = ParamChecker.checkStringAndGet(param, KEY_END_TIME_STRING, true);
            try {
                long startTimestampMillis = TimeUtils.parseTimeStringToTimestampMillis(startTime);
                config.setStartTimestampMillis(startTimestampMillis);
            } catch (Exception ex) {
                throw new OTSStreamReaderException("Can't parse startTimeString: " + startTime + ", expect format date like '201801151612'.");
            }
            try {
                long endTimestampMillis = TimeUtils.parseTimeStringToTimestampMillis(endTime);
                config.setEndTimestampMillis(endTimestampMillis);
            } catch (Exception ex) {
                throw new OTSStreamReaderException("Can't parse endTimeString: " + endTime + ", expect format date like '201801151612'.");
            }

        } else if (param.getString(KEY_DATE) == null) {
            config.setStartTimestampMillis(param.getLong(KEY_START_TIMESTAMP_MILLIS));
            config.setEndTimestampMillis(param.getLong(KEY_END_TIMESTAMP_MILLIS));
        } else {
            String date = ParamChecker.checkStringAndGet(param, KEY_DATE, true);
            try {
                long startTimestampMillis = TimeUtils.parseDateToTimestampMillis(date);
                config.setStartTimestampMillis(startTimestampMillis);
                config.setEndTimestampMillis(startTimestampMillis + TimeUtils.DAY_IN_MILLIS);
            } catch (ParseException ex) {
                throw new OTSStreamReaderException("Can't parse date: " + date);
            }
        }


        if (config.getStartTimestampMillis() >= config.getEndTimestampMillis()) {
            throw new OTSStreamReaderException("EndTimestamp must be larger than startTimestamp.");
        }

        config.setMaxRetries(param.getInt(KEY_MAX_RETRIES, DEFAULT_MAX_RETRIES));

        String mode = param.getString(KEY_MODE);
        if (mode != null) {
            if (mode.equalsIgnoreCase(Mode.SINGLE_VERSION_AND_UPDATE_ONLY.name())) {
                config.setMode(Mode.SINGLE_VERSION_AND_UPDATE_ONLY);
                parseConfigForSingleVersionAndUpdateOnlyMode(config, param);
            } else {
                throw new OTSStreamReaderException("Unsupported Mode: " + mode + ", please check your config.");
            }
        } else {
            config.setMode(Mode.MULTI_VERSION);
            List<Object> values = param.getList(KEY_COLUMN);
            if (values != null) {
                LOG.warn("The multi version mode doesn't support setting columns, column config will ignore.");
            }
            Boolean isTimeseriesTable = param.getBool(IS_TIMESERIES_TABLE);
            if (isTimeseriesTable != null) {
                LOG.warn("The multi version mode doesn't support setting Timeseries stream, stream config will ignore.");
            }
        }

        LOG.info("endpoint: {}, accessKeyId: {}, accessKeySecret: {}, instanceName: {}, dataTableName: {}, statusTableName: {}," +
                        " isExportSequenceInfo: {}, startTimestampMillis: {}, endTimestampMillis:{}, maxRetries:{}, enableSeekIteratorByTimestamp: {}, "
                        + "confSimplifyEnable: {}, isTimeseriesTable: {}.", config.getEndpoint(),
                config.getAccessId(), config.getAccessKey(), config.getInstanceName(), config.getDataTable(),
                config.getStatusTable(), config.isExportSequenceInfo(), config.getStartTimestampMillis(),
                config.getEndTimestampMillis(), config.getMaxRetries(), config.getEnableSeekIteratorByTimestamp(),
                config.isConfSimplifyEnable(), config.isTimeseriesTable());

        return config;
    }