in hologres-connector-spark-base/src/main/scala/com/alibaba/hologres/spark/sink/FieldWriter.scala [128:166]
def createFieldWriter(holoColumn: Column, removeU0000: Boolean = false): FieldWriter = {
holoColumn.getType match {
case Types.TINYINT | Types.SMALLINT =>
new ShortFieldWriter
case Types.INTEGER =>
new IntFieldWriter
case Types.BIGINT =>
new LongFieldWriter
case Types.REAL | Types.FLOAT =>
new FloatFieldWriter
case Types.DOUBLE =>
new DoubleFieldWriter
case Types.NUMERIC | Types.DECIMAL =>
new DecimalFieldWriter(holoColumn.getPrecision, holoColumn.getScale)
case Types.BOOLEAN | Types.BIT =>
new BooleanFieldWriter
case Types.CHAR | Types.VARCHAR | Types.LONGVARCHAR =>
new StringFieldWriter(removeU0000)
case Types.DATE => new DateFieldWriter
case Types.TIMESTAMP => new TimestampFieldWriter
case Types.BINARY | Types.VARBINARY => new BinaryFieldWriter
case Types.OTHER =>
holoColumn.getTypeName match {
case "json" | "jsonb" => new StringFieldWriter(removeU0000)
case "roaringbitmap" => new BinaryFieldWriter
}
case Types.ARRAY =>
holoColumn.getTypeName match {
case "_int4" => new IntArrayFieldWriter
case "_int8" => new LongArrayFieldWriter
case "_float4" => new FloatArrayFieldWriter
case "_float8" => new DoubleArrayFieldWriter
case "_bool" => new BooleanArrayFieldWriter
case "_varchar" | "_text" => new StringArrayFieldWriter
}
case _ =>
throw new IllegalArgumentException(String.format("Hologres sink does not support data type %s for now", holoColumn.getTypeName))
}
}