in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [155:258]
def testDataframeWriteArrayTypes(): Unit = {
/*
CREATE TABLE `spark_connector_array` (
`id` int(11) NOT NULL,
`c_array_boolean` ARRAY<boolean> NULL,
`c_array_tinyint` ARRAY<tinyint> NULL,
`c_array_smallint` ARRAY<smallint> NULL,
`c_array_int` ARRAY<int> NULL,
`c_array_bigint` ARRAY<bigint> NULL,
`c_array_largeint` ARRAY<largeint> NULL,
`c_array_float` ARRAY<float> NULL,
`c_array_double` ARRAY<double> NULL,
`c_array_decimal` ARRAY<DECIMAL(10, 5)> NULL,
`c_array_date` ARRAY<date> NULL,
`c_array_datetime` ARRAY<datetime(6)> NULL,
`c_array_char` ARRAY<char(10)> NULL,
`c_array_varchar` ARRAY<varchar(10)> NULL,
`c_array_string` ARRAY<string> NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
+------------------+-------------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+-------------------------+------+-------+---------+-------+
| id | INT | No | true | NULL | |
| c_array_boolean | ARRAY<BOOLEAN> | Yes | false | [] | NONE |
| c_array_tinyint | ARRAY<TINYINT> | Yes | false | [] | NONE |
| c_array_smallint | ARRAY<SMALLINT> | Yes | false | [] | NONE |
| c_array_int | ARRAY<INT> | Yes | false | [] | NONE |
| c_array_bigint | ARRAY<BIGINT> | Yes | false | [] | NONE |
| c_array_largeint | ARRAY<LARGEINT> | Yes | false | [] | NONE |
| c_array_float | ARRAY<FLOAT> | Yes | false | [] | NONE |
| c_array_double | ARRAY<DOUBLE> | Yes | false | [] | NONE |
| c_array_decimal | ARRAY<DECIMALV3(10, 5)> | Yes | false | [] | NONE |
| c_array_date | ARRAY<DATEV2> | Yes | false | [] | NONE |
| c_array_datetime | ARRAY<DATETIMEV2(6)> | Yes | false | [] | NONE |
| c_array_char | ARRAY<CHAR(10)> | Yes | false | [] | NONE |
| c_array_varchar | ARRAY<VARCHAR(10)> | Yes | false | [] | NONE |
| c_array_string | ARRAY<TEXT> | Yes | false | [] | NONE |
+------------------+-------------------------+------+-------+---------+-------+
*/
val schema = new StructType()
.add("id", IntegerType)
.add("c_array_boolean", ArrayType(BooleanType))
.add("c_array_tinyint", ArrayType(ByteType))
.add("c_array_smallint", ArrayType(ShortType))
.add("c_array_int", ArrayType(IntegerType))
.add("c_array_bigint", ArrayType(LongType))
.add("c_array_largeint", ArrayType(StringType))
.add("c_array_float", ArrayType(FloatType))
.add("c_array_double", ArrayType(DoubleType))
.add("c_array_decimal", ArrayType(DecimalType.apply(10, 5)))
.add("c_array_date", ArrayType(DateType))
.add("c_array_datetime", ArrayType(TimestampType))
.add("c_array_char", ArrayType(StringType))
.add("c_array_varchar", ArrayType(StringType))
.add("c_array_string", ArrayType(StringType))
val row = Row(
1,
Array(true, false, false, true, true),
Array(1.toByte, 2.toByte, 3.toByte),
Array(2.toShort, 12.toShort, 32.toShort),
Array(3, 4, 5, 6),
Array(4.toLong, 5.toLong, 6.toLong),
Array("123456789", "987654321", "123789456"),
Array(6.6.floatValue(), 6.7.floatValue(), 7.8.floatValue()),
Array(7.7.doubleValue(), 8.8.doubleValue(), 8.9.floatValue()),
Array(Decimal.apply(3.12), Decimal.apply(1.12345)),
Array(Date.valueOf("2023-09-08"), Date.valueOf("2027-10-28")),
Array(Timestamp.valueOf("2023-09-08 17:12:34.123456"), Timestamp.valueOf("2024-09-08 18:12:34.123456")),
Array("char", "char2"),
Array("varchar", "varchar2"),
Array("string", "string2")
)
val inputList = ListBuffer[Row]()
for (a <- 0 until 7) {
inputList.append(row)
}
val rdd = spark.sparkContext.parallelize(inputList, 1)
val df = spark.createDataFrame(rdd, schema).toDF()
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("doris.table.identifier", s"$databaseName.spark_connector_array")
.option("doris.sink.batch.size", 30)
.option("doris.sink.properties.format", "arrow")
.option("doris.sink.max-retries", 0)
.save()
}