in otswriter/src/main/java/com/alibaba/datax/plugin/writer/otswriter/utils/ParseRecord.java [144:238]
public static OTSLine parseNormalRecordToOTSLineOfTimeseriesTable(
List<OTSAttrColumn> attrColumns,
Record record,
TimeUnit timeUnit
) throws OTSCriticalException {
if (attrColumns.size() != record.getColumnNumber()){
throw new OTSCriticalException(String.format("Bug branch, the count(%d) of record != count(%d) of column from config.", record.getColumnNumber(), (attrColumns.size())));
}
Map<String, String> tags = new HashMap<>();
String measurementName = null;
String dataSource = null;
Long timeInUs = null;
Map<String, ColumnValue> columnsValues = new HashMap<>();
try {
for (int i = 0; i < attrColumns.size(); i++) {
// 如果是tags内部字段
if (attrColumns.get(i).getTag()){
tags.put(attrColumns.get(i).getName(), record.getColumn(i).asString());
}
else if (attrColumns.get(i).getName().equals(OTSConst.MEASUREMENT_NAME)){
measurementName = record.getColumn(i).asString();
}
else if (attrColumns.get(i).getName().equals(OTSConst.DATA_SOURCE)){
dataSource = record.getColumn(i).asString();
}
else if (attrColumns.get(i).getName().equals(OTSConst.TAGS)){
String tagString = record.getColumn(i).asString();
tags.putAll(TimeseriesResponseFactory.parseTagsOrAttrs(tagString));
}
else if (attrColumns.get(i).getName().equals(OTSConst.TIME)){
timeInUs = record.getColumn(i).asLong();
}
else{
switch (attrColumns.get(i).getType()){
case INTEGER:
columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromLong(record.getColumn(i).asLong()));
break;
case BOOLEAN:
columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromBoolean(record.getColumn(i).asBoolean()));
break;
case DOUBLE:
columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromDouble(record.getColumn(i).asDouble()));
break;
case BINARY:
columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromBinary(record.getColumn(i).asBytes()));
break;
case STRING:
default:
columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromString(record.getColumn(i).asString()));
break;
}
}
}
// 度量名称与时间戳字段值不能为空,否则报错
if (measurementName == null){
throw new IllegalArgumentException("The value of the '_m_name' (measurement) field cannot be empty. Please check the input of writer");
}
else if (timeInUs == null){
throw new IllegalArgumentException("The value of the '_time' field cannot be empty. Please check the input of writer");
}
} catch (IllegalArgumentException e) {
LOG.warn("getAttrFromRecord fail : {}", e.getMessage(), e);
CollectorUtil.collect(record, e.getMessage());
return null;
}
TimeseriesKey key = new TimeseriesKey(measurementName, dataSource, tags);
TimeseriesRow row = new TimeseriesRow(key);
switch (timeUnit){
case NANOSECONDS:
timeInUs = timeInUs / 1000;
break;
case MILLISECONDS:
timeInUs = timeInUs * 1000;
break;
case SECONDS:
timeInUs = timeInUs * 1000 * 1000;
break;
case MINUTES:
timeInUs = timeInUs * 1000 * 1000 * 60;
break;
case MICROSECONDS:
default:
break;
}
row.setTimeInUs(timeInUs);
for (Map.Entry<String, ColumnValue> entry : columnsValues.entrySet()){
row.addField(entry.getKey(), entry.getValue());
}
return new OTSLine(record, row);
}