final protected Point buildPoint()

in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java [64:124]


    final protected Point buildPoint(Record<GenericRecord> record) {
        val genericRecord = record.getValue();

        // looking for measurement
        val measurementField = genericRecord.getField("measurement");
        if (null == measurementField) {
            throw new SchemaSerializationException("device is a required field.");
        }
        val measurement = (String) measurementField;

        // looking for timestamp
        long timestamp;
        val timestampField = getFiled(genericRecord, "timestamp");
        if (null == timestampField) {
            timestamp = System.currentTimeMillis();
        } else if (timestampField instanceof Number) {
            timestamp = ((Number) timestampField).longValue();
        } else if (timestampField instanceof String) {
            timestamp = Long.parseLong((String) timestampField);
        } else {
            throw new SchemaSerializationException("Invalid timestamp field");
        }

        val point = Point.measurement(measurement).time(timestamp, writePrecision);

        // Looking for tag fields
        val tagsField = getFiled(genericRecord, "tags");
        if (null != tagsField) {
            if (tagsField instanceof GenericRecord) { // JSONSchema
                GenericRecord tagsRecord = (GenericRecord) tagsField;
                for (Field field : tagsRecord.getFields()) {
                    val fieldName = field.getName();
                    val value = tagsRecord.getField(field);
                    point.addTag(fieldName, (String) value);
                }
            } else if (Map.class.isAssignableFrom(tagsField.getClass())) { // AvroSchema
                Map<Object, Object> tagsMap = (Map<Object, Object>) tagsField;
                tagsMap.forEach((key, value) -> point.addTag(key.toString(), value.toString()));
            } else {
                throw new SchemaSerializationException("Unknown type for 'tags'");
            }
        }

        // Looking for sensor fields
        val columnsField = genericRecord.getField("fields");
        if (columnsField instanceof GenericRecord) { // JSONSchema
            val columnsRecord = (GenericRecord) columnsField;
            for (Field field : columnsRecord.getFields()) {
                val fieldName = field.getName();
                val value = columnsRecord.getField(field);
                addPointField(point, fieldName, value);
            }
        } else if (Map.class.isAssignableFrom(columnsField.getClass())) { // AvroSchema
            val columnsMap = (Map<Object, Object>) columnsField;
            columnsMap.forEach((key, value) -> addPointField(point, key.toString(), value));
        } else {
            throw new SchemaSerializationException("Unknown type for 'fields'");
        }

        return point;
    }