protected Point buildPoint()

in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBGenericRecordSink.java [47:94]


    protected Point buildPoint(Record<GenericRecord> message) throws Exception {
        Map<String, String> tags;
        Map<String, Object> fields = Maps.newHashMap();

        GenericRecord record = message.getValue();

        Object measurementField = getFiled(record, "measurement");
        if (null == measurementField) {
            throw new SchemaSerializationException("measurement is a required field.");
        }

        String measurement = measurementField.toString();

        // Looking for tags
        Object tagsField = getFiled(record, "tags");
        if (null == tagsField) {
            tags = ImmutableMap.of();
        } else if (Map.class.isAssignableFrom(tagsField.getClass())) {
            tags = ((Map<Object, Object>) tagsField).entrySet()
                    .stream().collect(Collectors.toMap(
                            entry -> entry.getKey().toString(),
                            entry -> entry.getValue().toString())
                    );
        } else {
            // Field 'tags' that is not of Map type will be ignored
            tags = ImmutableMap.of();
        }

        // Just insert the current time millis
        long timestamp = System.currentTimeMillis();

        for (Field field : record.getFields()) {
            String fieldName = field.getName();
            if (FIELDS_TO_SKIP.contains(fieldName)) {
                continue;
            }
            Object fieldValue = record.getField(field);
            if (null != fieldValue) {
                fields.put(fieldName, fieldValue);
            }
        }

        Point.Builder builder = Point.measurement(measurement)
                .time(timestamp, TimeUnit.MILLISECONDS)
                .tag(tags)
                .fields(fields);
        return builder.build();
    }