in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [260:363]
def testDataframeWriteMapType(): Unit = {
/*
CREATE TABLE `spark_connector_map` (
`id` int(11) NOT NULL,
`c_map_bool` Map<boolean,boolean> NULL,
`c_map_tinyint` Map<tinyint,tinyint> NULL,
`c_map_smallint` Map<smallint,smallint> NULL,
`c_map_int` Map<int,int> NULL,
`c_map_bigint` Map<bigint,bigint> NULL,
`c_map_largeint` Map<largeint,largeint> NULL,
`c_map_float` Map<float,float> NULL,
`c_map_double` Map<double,double> NULL,
`c_map_decimal` Map<DECIMAL(10, 5),DECIMAL(10, 5)> NULL,
`c_map_date` Map<date,date> NULL,
`c_map_datetime` Map<datetime(6),datetime(6)> NULL,
`c_map_char` Map<char(10),char(10)> NULL,
`c_map_varchar` Map<varchar(10),varchar(10)> NULL,
`c_map_string` Map<string,string> NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
+----------------+----------------------------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------------+----------------------------------------+------+-------+---------+-------+
| id | INT | No | true | NULL | |
| c_map_bool | MAP<BOOLEAN,BOOLEAN> | Yes | false | NULL | NONE |
| c_map_tinyint | MAP<TINYINT,TINYINT> | Yes | false | NULL | NONE |
| c_map_smallint | MAP<SMALLINT,SMALLINT> | Yes | false | NULL | NONE |
| c_map_int | MAP<INT,INT> | Yes | false | NULL | NONE |
| c_map_bigint | MAP<BIGINT,BIGINT> | Yes | false | NULL | NONE |
| c_map_largeint | MAP<LARGEINT,LARGEINT> | Yes | false | NULL | NONE |
| c_map_float | MAP<FLOAT,FLOAT> | Yes | false | NULL | NONE |
| c_map_double | MAP<DOUBLE,DOUBLE> | Yes | false | NULL | NONE |
| c_map_decimal | MAP<DECIMALV3(10, 5),DECIMALV3(10, 5)> | Yes | false | NULL | NONE |
| c_map_date | MAP<DATEV2,DATEV2> | Yes | false | NULL | NONE |
| c_map_datetime | MAP<DATETIMEV2(6),DATETIMEV2(6)> | Yes | false | NULL | NONE |
| c_map_char | MAP<CHAR(10),CHAR(10)> | Yes | false | NULL | NONE |
| c_map_varchar | MAP<VARCHAR(10),VARCHAR(10)> | Yes | false | NULL | NONE |
| c_map_string | MAP<TEXT,TEXT> | Yes | false | NULL | NONE |
+----------------+----------------------------------------+------+-------+---------+-------+
*/
val schema = new StructType()
.add("id", IntegerType)
.add("c_map_bool", MapType(BooleanType, BooleanType))
.add("c_map_tinyint", MapType(ByteType, ByteType))
.add("c_map_smallint", MapType(ShortType, ShortType))
.add("c_map_int", MapType(IntegerType, IntegerType))
.add("c_map_bigint", MapType(LongType, LongType))
.add("c_map_largeint", MapType(StringType, StringType))
.add("c_map_float", MapType(FloatType, FloatType))
.add("c_map_double", MapType(DoubleType, DoubleType))
.add("c_map_decimal", MapType(DecimalType.apply(10, 5), DecimalType.apply(10, 5)))
.add("c_map_date", MapType(DateType, DateType))
.add("c_map_datetime", MapType(TimestampType, TimestampType))
.add("c_map_char", MapType(StringType, StringType))
.add("c_map_varchar", MapType(StringType, StringType))
.add("c_map_string", MapType(StringType, StringType))
val row = Row(
1,
Map(true -> false, false -> true, true -> true),
Map(1.toByte -> 2.toByte, 3.toByte -> 4.toByte),
Map(2.toShort -> 4.toShort, 5.toShort -> 6.toShort),
Map(3 -> 4, 7 -> 8),
Map(4.toLong -> 5.toLong, 1.toLong -> 2.toLong),
Map("123456789" -> "987654321", "789456123" -> "456789123"),
Map(6.6.floatValue() -> 8.8.floatValue(), 9.9.floatValue() -> 10.1.floatValue()),
Map(7.7.doubleValue() -> 1.1.doubleValue(), 2.2 -> 3.3.doubleValue()),
Map(Decimal.apply(3.12) -> Decimal.apply(1.23), Decimal.apply(2.34) -> Decimal.apply(5.67)),
Map(Date.valueOf("2023-09-08") -> Date.valueOf("2024-09-08"), Date.valueOf("1023-09-08") -> Date.valueOf("2023-09-08")),
Map(Timestamp.valueOf("1023-09-08 17:12:34.123456") -> Timestamp.valueOf("2023-09-08 17:12:34.123456"), Timestamp.valueOf("3023-09-08 17:12:34.123456") -> Timestamp.valueOf("4023-09-08 17:12:34.123456")),
Map("char" -> "char2", "char2" -> "char3"),
Map("varchar" -> "varchar2", "varchar3" -> "varchar4"),
Map("string" -> "string2", "string3" -> "string4")
)
val inputList = ListBuffer[Row]()
for (a <- 0 until 7) {
inputList.append(row)
}
val rdd = spark.sparkContext.parallelize(inputList, 1)
val df = spark.createDataFrame(rdd, schema).toDF()
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("doris.table.identifier", s"$databaseName.spark_connector_map")
.option("doris.sink.batch.size", 3)
.option("doris.sink.properties.format", "arrow")
.option("doris.sink.max-retries", 0)
.save()
}