in influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala [140:175]
private def setField(pointBuilder: Point.Builder, fieldType: Class[_], columnName: String, value: Any): Unit =
if (classOf[java.lang.Boolean].isAssignableFrom(fieldType) || classOf[Boolean].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[Boolean])
else if (classOf[java.lang.Long].isAssignableFrom(fieldType) || classOf[Long].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[Long])
else if (classOf[java.lang.Double].isAssignableFrom(fieldType) || classOf[Double].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[Double])
else if (classOf[java.lang.Integer].isAssignableFrom(fieldType) || classOf[Integer].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[Int])
else if (classOf[String].isAssignableFrom(fieldType)) pointBuilder.addField(columnName, value.asInstanceOf[String])
else throw new InfluxDBMapperException("Unsupported type " + fieldType + " for column " + columnName)
private def throwExceptionIfMissingAnnotation(clazz: Class[_]): Unit =
if (!clazz.isAnnotationPresent(classOf[Measurement]))
throw new IllegalArgumentException(
"Class " + clazz.getName + " is not annotated with @" + classOf[Measurement].getSimpleName)
private def parseRowAs[T](clazz: Class[T],
columns: java.util.List[String],
values: java.util.List[AnyRef],
precision: TimeUnit): T =
try {
val fieldMap = CLASS_FIELD_CACHE.get(clazz.getName)
val obj: T = clazz.getDeclaredConstructor().newInstance()
for (i <- 0 until columns.size()) {
val correspondingField = fieldMap.get(columns.get(i))
if (correspondingField != null) {
setFieldValue(obj, correspondingField, values.get(i), precision)
}
}
obj
} catch {
case e @ (_: InstantiationException | _: IllegalAccessException) =>
throw new InfluxDBMapperException(e)
}