in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java [64:124]
final protected Point buildPoint(Record<GenericRecord> record) {
val genericRecord = record.getValue();
// looking for measurement
val measurementField = genericRecord.getField("measurement");
if (null == measurementField) {
throw new SchemaSerializationException("device is a required field.");
}
val measurement = (String) measurementField;
// looking for timestamp
long timestamp;
val timestampField = getFiled(genericRecord, "timestamp");
if (null == timestampField) {
timestamp = System.currentTimeMillis();
} else if (timestampField instanceof Number) {
timestamp = ((Number) timestampField).longValue();
} else if (timestampField instanceof String) {
timestamp = Long.parseLong((String) timestampField);
} else {
throw new SchemaSerializationException("Invalid timestamp field");
}
val point = Point.measurement(measurement).time(timestamp, writePrecision);
// Looking for tag fields
val tagsField = getFiled(genericRecord, "tags");
if (null != tagsField) {
if (tagsField instanceof GenericRecord) { // JSONSchema
GenericRecord tagsRecord = (GenericRecord) tagsField;
for (Field field : tagsRecord.getFields()) {
val fieldName = field.getName();
val value = tagsRecord.getField(field);
point.addTag(fieldName, (String) value);
}
} else if (Map.class.isAssignableFrom(tagsField.getClass())) { // AvroSchema
Map<Object, Object> tagsMap = (Map<Object, Object>) tagsField;
tagsMap.forEach((key, value) -> point.addTag(key.toString(), value.toString()));
} else {
throw new SchemaSerializationException("Unknown type for 'tags'");
}
}
// Looking for sensor fields
val columnsField = genericRecord.getField("fields");
if (columnsField instanceof GenericRecord) { // JSONSchema
val columnsRecord = (GenericRecord) columnsField;
for (Field field : columnsRecord.getFields()) {
val fieldName = field.getName();
val value = columnsRecord.getField(field);
addPointField(point, fieldName, value);
}
} else if (Map.class.isAssignableFrom(columnsField.getClass())) { // AvroSchema
val columnsMap = (Map<Object, Object>) columnsField;
columnsMap.forEach((key, value) -> addPointField(point, key.toString(), value));
} else {
throw new SchemaSerializationException("Unknown type for 'fields'");
}
return point;
}