public List split()

in tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java [163:278]


        public List<Configuration> split(int adviceNumber) {
            List<Configuration> configurations = new ArrayList<>();

            // get metrics
            String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
            List<String> columns4TSDB = null;
            List<String> columns4RDB = null;
            List<String> metrics = null;
            if ("TSDB".equals(type)) {
                columns4TSDB = originalConfig.getList(Key.COLUMN, String.class);
            } else {
                columns4RDB = originalConfig.getList(Key.COLUMN, String.class);
                metrics = originalConfig.getList(Key.METRIC, String.class);
            }

            // get time interval
            Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
                    Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);

            // get time range
            SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
            long startTime;
            try {
                startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
            } catch (ParseException e) {
                throw DataXException.asDataXException(
                        TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.BEGIN_DATE_TIME + "] failed.", e);
            }
            long endTime;
            try {
                endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
            } catch (ParseException e) {
                throw DataXException.asDataXException(
                        TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.END_DATE_TIME + "] failed.", e);
            }
            if (TimeUtils.isSecond(startTime)) {
                startTime *= 1000;
            }
            if (TimeUtils.isSecond(endTime)) {
                endTime *= 1000;
            }
            DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
            DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));

            final Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);

            if ("TSDB".equals(type)) {
                if (isCombine) {
                    // split by time in hour
                    while (startDateTime.isBefore(endDateTime)) {
                        Configuration clone = this.originalConfig.clone();
                        clone.set(Key.COLUMN, columns4TSDB);

                        clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
                        startDateTime = startDateTime.plusMillis(splitIntervalMs);
                        // Make sure the time interval is [start, end).
                        clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
                        configurations.add(clone);

                        LOG.info("Configuration: {}", JSON.toJSONString(clone));
                    }
                } else {
                    // split by time in hour
                    while (startDateTime.isBefore(endDateTime)) {
                        // split by metric
                        for (String column : columns4TSDB) {
                            Configuration clone = this.originalConfig.clone();
                            clone.set(Key.COLUMN, Collections.singletonList(column));

                            clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
                            startDateTime = startDateTime.plusMillis(splitIntervalMs);
                            // Make sure the time interval is [start, end).
                            clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
                            configurations.add(clone);

                            LOG.info("Configuration: {}", JSON.toJSONString(clone));
                        }
                    }
                }
            } else {
                if (isCombine) {
                    while (startDateTime.isBefore(endDateTime)) {
                        Configuration clone = this.originalConfig.clone();
                        clone.set(Key.COLUMN, columns4RDB);
                        clone.set(Key.METRIC, metrics);

                        clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
                        startDateTime = startDateTime.plusMillis(splitIntervalMs);
                        // Make sure the time interval is [start, end).
                        clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
                        configurations.add(clone);

                        LOG.info("Configuration: {}", JSON.toJSONString(clone));
                    }
                } else {
                    // split by time in hour
                    while (startDateTime.isBefore(endDateTime)) {
                        // split by metric
                        for (String metric : metrics) {
                            Configuration clone = this.originalConfig.clone();
                            clone.set(Key.COLUMN, columns4RDB);
                            clone.set(Key.METRIC, Collections.singletonList(metric));

                            clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
                            startDateTime = startDateTime.plusMillis(splitIntervalMs);
                            // Make sure the time interval is [start, end).
                            clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
                            configurations.add(clone);

                            LOG.info("Configuration: {}", JSON.toJSONString(clone));
                        }
                    }
                }
            }
            return configurations;
        }