in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [365:460]
def testDataframeWriteStructType(): Unit = {
/*
CREATE TABLE `spark_connector_struct` (
`id` int NOT NULL,
`st` STRUCT<
`c_bool`:boolean,
`c_tinyint`:tinyint(4),
`c_smallint`:smallint(6),
`c_int`:int(11),
`c_bigint`:bigint(20),
`c_largeint`:largeint(40),
`c_float`:float,
`c_double`:double,
`c_decimal`:DECIMAL(10, 5),
`c_date`:date,
`c_datetime`:datetime(6),
`c_char`:char(10),
`c_varchar`:varchar(10),
`c_string`:string
> NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+
| id | INT | No | true | NULL | |
| st | STRUCT<c_bool:BOOLEAN,c_tinyint:TINYINT,c_smallint:SMALLINT,c_int:INT,c_bigint:BIGINT,c_largeint:LARGEINT,c_float:FLOAT,c_double:DOUBLE,c_decimal:DECIMALV3(10, 5),c_date:DATEV2,c_datetime:DATETIMEV2(6),c_char:CHAR(10),c_varchar:VARCHAR(10),c_string:TEXT> | Yes | false | NULL | NONE |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+
*/
val st = new StructType()
.add("c_bool", BooleanType)
.add("c_tinyint", ByteType)
.add("c_smallint", ShortType)
.add("c_int", IntegerType)
.add("c_bigint", LongType)
.add("c_largeint", StringType)
.add("c_float", FloatType)
.add("c_double", DoubleType)
.add("c_decimal", DecimalType.apply(10, 5))
.add("c_date", DateType)
.add("c_datetime", TimestampType)
.add("c_char", StringType)
.add("c_varchar", StringType)
.add("c_string", StringType)
val schema = new StructType()
.add("id", IntegerType)
.add("st", st)
val row = Row(
1,
Row(true,
1.toByte,
2.toShort,
3,
4.toLong,
"123456789",
6.6.floatValue(),
7.7.doubleValue(),
Decimal.apply(3.12),
Date.valueOf("2023-09-08"),
Timestamp.valueOf("2023-09-08 17:12:34.123456"),
"char",
"varchar",
"string")
)
val inputList = ListBuffer[Row]()
for (a <- 0 until 7) {
inputList.append(row)
}
val rdd = spark.sparkContext.parallelize(inputList, 1)
val df = spark.createDataFrame(rdd, schema).toDF()
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("doris.table.identifier", s"$databaseName.spark_connector_struct")
.option("doris.sink.batch.size", 3)
.option("doris.sink.properties.format", "arrow")
.option("doris.sink.max-retries", 0)
.save()
}