in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBGenericRecordSink.java [47:94]
protected Point buildPoint(Record<GenericRecord> message) throws Exception {
Map<String, String> tags;
Map<String, Object> fields = Maps.newHashMap();
GenericRecord record = message.getValue();
Object measurementField = getFiled(record, "measurement");
if (null == measurementField) {
throw new SchemaSerializationException("measurement is a required field.");
}
String measurement = measurementField.toString();
// Looking for tags
Object tagsField = getFiled(record, "tags");
if (null == tagsField) {
tags = ImmutableMap.of();
} else if (Map.class.isAssignableFrom(tagsField.getClass())) {
tags = ((Map<Object, Object>) tagsField).entrySet()
.stream().collect(Collectors.toMap(
entry -> entry.getKey().toString(),
entry -> entry.getValue().toString())
);
} else {
// Field 'tags' that is not of Map type will be ignored
tags = ImmutableMap.of();
}
// Just insert the current time millis
long timestamp = System.currentTimeMillis();
for (Field field : record.getFields()) {
String fieldName = field.getName();
if (FIELDS_TO_SKIP.contains(fieldName)) {
continue;
}
Object fieldValue = record.getField(field);
if (null != fieldValue) {
fields.put(fieldName, fieldValue);
}
}
Point.Builder builder = Point.measurement(measurement)
.time(timestamp, TimeUnit.MILLISECONDS)
.tag(tags)
.fields(fields);
return builder.build();
}