in tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java [163:278]
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>();
// get metrics
String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
List<String> columns4TSDB = null;
List<String> columns4RDB = null;
List<String> metrics = null;
if ("TSDB".equals(type)) {
columns4TSDB = originalConfig.getList(Key.COLUMN, String.class);
} else {
columns4RDB = originalConfig.getList(Key.COLUMN, String.class);
metrics = originalConfig.getList(Key.METRIC, String.class);
}
// get time interval
Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);
// get time range
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
long startTime;
try {
startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.BEGIN_DATE_TIME + "] failed.", e);
}
long endTime;
try {
endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.END_DATE_TIME + "] failed.", e);
}
if (TimeUtils.isSecond(startTime)) {
startTime *= 1000;
}
if (TimeUtils.isSecond(endTime)) {
endTime *= 1000;
}
DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
final Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
if ("TSDB".equals(type)) {
if (isCombine) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4TSDB);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
} else {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
// split by metric
for (String column : columns4TSDB) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, Collections.singletonList(column));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
}
} else {
if (isCombine) {
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
clone.set(Key.METRIC, metrics);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
} else {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
// split by metric
for (String metric : metrics) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
clone.set(Key.METRIC, Collections.singletonList(metric));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
}
}
return configurations;
}