public static OTSLine parseNormalRecordToOTSLineOfTimeseriesTable()

in otswriter/src/main/java/com/alibaba/datax/plugin/writer/otswriter/utils/ParseRecord.java [144:238]


    public static OTSLine parseNormalRecordToOTSLineOfTimeseriesTable(
            List<OTSAttrColumn> attrColumns,
            Record record,
            TimeUnit timeUnit
    ) throws OTSCriticalException {

        if (attrColumns.size() != record.getColumnNumber()){
            throw new OTSCriticalException(String.format("Bug branch, the count(%d) of record != count(%d) of column from config.", record.getColumnNumber(), (attrColumns.size())));
        }

        Map<String, String> tags = new HashMap<>();
        String measurementName = null;
        String dataSource = null;
        Long timeInUs = null;
        Map<String, ColumnValue> columnsValues = new HashMap<>();

        try {
            for (int i = 0; i < attrColumns.size(); i++) {
                // 如果是tags内部字段
                if (attrColumns.get(i).getTag()){
                    tags.put(attrColumns.get(i).getName(), record.getColumn(i).asString());
                }
                else if (attrColumns.get(i).getName().equals(OTSConst.MEASUREMENT_NAME)){
                    measurementName = record.getColumn(i).asString();
                }
                else if (attrColumns.get(i).getName().equals(OTSConst.DATA_SOURCE)){
                    dataSource = record.getColumn(i).asString();
                }
                else if (attrColumns.get(i).getName().equals(OTSConst.TAGS)){
                    String tagString = record.getColumn(i).asString();
                    tags.putAll(TimeseriesResponseFactory.parseTagsOrAttrs(tagString));
                }
                else if (attrColumns.get(i).getName().equals(OTSConst.TIME)){
                    timeInUs = record.getColumn(i).asLong();
                }
                else{
                    switch (attrColumns.get(i).getType()){
                        case INTEGER:
                            columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromLong(record.getColumn(i).asLong()));
                            break;
                        case BOOLEAN:
                            columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromBoolean(record.getColumn(i).asBoolean()));
                            break;
                        case DOUBLE:
                            columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromDouble(record.getColumn(i).asDouble()));
                            break;
                        case BINARY:
                            columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromBinary(record.getColumn(i).asBytes()));
                            break;
                        case STRING:
                        default:
                            columnsValues.put(attrColumns.get(i).getName(), ColumnValue.fromString(record.getColumn(i).asString()));
                            break;
                    }
                }
            }
            // 度量名称与时间戳字段值不能为空,否则报错
            if (measurementName == null){
                throw new IllegalArgumentException("The value of the '_m_name' (measurement) field cannot be empty. Please check the input of writer");
            }
            else if (timeInUs == null){
                throw new IllegalArgumentException("The value of the '_time' field cannot be empty. Please check the input of writer");
            }
        } catch (IllegalArgumentException e) {
            LOG.warn("getAttrFromRecord fail : {}", e.getMessage(), e);
            CollectorUtil.collect(record, e.getMessage());
            return null;
        }
        TimeseriesKey key = new TimeseriesKey(measurementName, dataSource, tags);
        TimeseriesRow row = new TimeseriesRow(key);
        switch (timeUnit){
            case NANOSECONDS:
                timeInUs = timeInUs / 1000;
                break;
            case MILLISECONDS:
                timeInUs = timeInUs * 1000;
                break;
            case SECONDS:
                timeInUs = timeInUs * 1000 * 1000;
                break;
            case MINUTES:
                timeInUs = timeInUs * 1000 * 1000 * 60;
                break;
            case MICROSECONDS:
            default:
                break;
        }
        row.setTimeInUs(timeInUs);

        for (Map.Entry<String, ColumnValue> entry : columnsValues.entrySet()){
            row.addField(entry.getKey(), entry.getValue());
        }

        return new OTSLine(record, row);
    }