in spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala [148:233]
private def convertField(field: Field, geoInfo: JMap[String, GeoField], parentName: String,
arrayIncludes: JList[NumberedInclude], arrayExcludes: JList[String], cfg:Settings): StructField = {
val absoluteName = if (parentName != null) parentName + "." + field.name() else field.name()
val matched = FieldFilter.filter(absoluteName, arrayIncludes, arrayExcludes, false)
val createArray = !arrayIncludes.isEmpty() && matched.matched
var dataType = Utils.extractType(field) match {
case NULL => NullType
case BINARY => BinaryType
case BOOLEAN => BooleanType
case BYTE => ByteType
case SHORT => ShortType
case INTEGER => IntegerType
case LONG => LongType
case FLOAT => FloatType
case DOUBLE => DoubleType
case HALF_FLOAT => FloatType
case SCALED_FLOAT => DoubleType
// String type
case STRING => StringType
case TEXT => StringType
case KEYWORD => StringType
case WILDCARD => StringType
case DATE => if (cfg.getMappingDateRich) TimestampType else StringType
case DATE_NANOS => if (cfg.getMappingDateRich) TimestampType else StringType
case OBJECT => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg)
case NESTED => DataTypes.createArrayType(convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg))
case JOIN => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg)
// GEO
case GEO_POINT => {
val geoPoint = geoInfo.get(absoluteName) match {
case GeoPointType.LON_LAT_ARRAY => DataTypes.createArrayType(DoubleType)
case GeoPointType.GEOHASH => StringType
case GeoPointType.LAT_LON_STRING => StringType
case GeoPointType.LAT_LON_OBJECT => {
val lat = DataTypes.createStructField("lat", DoubleType, true)
val lon = DataTypes.createStructField("lon", DoubleType, true)
DataTypes.createStructType(Array(lat,lon))
}
}
if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Detected field [${absoluteName}] as a GeoPoint with format ${geoPoint.simpleString}")
}
geoPoint
}
case GEO_SHAPE => {
val fields = new ArrayList[StructField]()
fields.add(DataTypes.createStructField("type", StringType, true))
val COORD = "coordinates"
geoInfo.get(absoluteName) match {
case GeoShapeType.POINT => fields.add(DataTypes.createStructField(COORD, DataTypes.createArrayType(DoubleType), true))
case GeoShapeType.LINE_STRING => fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 2), true))
case GeoShapeType.POLYGON => {
fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 3), true))
fields.add(DataTypes.createStructField("orientation", StringType, true))
}
case GeoShapeType.MULTI_POINT => fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 2), true))
case GeoShapeType.MULTI_LINE_STRING => fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 3), true))
case GeoShapeType.MULTI_POLYGON => fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 4), true))
case GeoShapeType.GEOMETRY_COLLECTION => throw new EsHadoopIllegalArgumentException(s"Geoshape $geoInfo not supported")
case GeoShapeType.ENVELOPE => fields.add(DataTypes.createStructField(COORD, createNestedArray(DoubleType, 2), true))
case GeoShapeType.CIRCLE => {
fields.add(DataTypes.createStructField(COORD, DataTypes.createArrayType(DoubleType), true))
fields.add(DataTypes.createStructField("radius", StringType, true))
}
}
val geoShape = DataTypes.createStructType(fields)
if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Detected field [${absoluteName}] as a GeoShape with format ${geoShape.simpleString}")
}
geoShape
}
// fall back to String
case _ => StringType //throw new EsHadoopIllegalStateException("Unknown field type " + field);
}
if (createArray) {
// can't call createNestedArray for some reason...
for (_ <- 0 until matched.depth) {
dataType = DataTypes.createArrayType(dataType)
}
}
DataTypes.createStructField(field.name(), dataType, true)
}