def testDataframeWriteArrayTypes()

in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [155:258]


  def testDataframeWriteArrayTypes(): Unit = {
    /*

  CREATE TABLE `spark_connector_array` (
        `id` int(11) NOT NULL,
        `c_array_boolean` ARRAY<boolean> NULL,
        `c_array_tinyint` ARRAY<tinyint> NULL,
        `c_array_smallint` ARRAY<smallint> NULL,
        `c_array_int` ARRAY<int> NULL,
        `c_array_bigint` ARRAY<bigint> NULL,
        `c_array_largeint` ARRAY<largeint> NULL,
        `c_array_float` ARRAY<float> NULL,
        `c_array_double` ARRAY<double> NULL,
        `c_array_decimal` ARRAY<DECIMAL(10, 5)> NULL,
        `c_array_date` ARRAY<date> NULL,
        `c_array_datetime` ARRAY<datetime(6)> NULL,
        `c_array_char` ARRAY<char(10)> NULL,
        `c_array_varchar` ARRAY<varchar(10)> NULL,
        `c_array_string` ARRAY<string> NULL
      ) ENGINE=OLAP
      DUPLICATE KEY(`id`)
      COMMENT 'OLAP'
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 1"
      );

    +------------------+-------------------------+------+-------+---------+-------+
    | Field            | Type                    | Null | Key   | Default | Extra |
    +------------------+-------------------------+------+-------+---------+-------+
    | id               | INT                     | No   | true  | NULL    |       |
    | c_array_boolean  | ARRAY<BOOLEAN>          | Yes  | false | []      | NONE  |
    | c_array_tinyint  | ARRAY<TINYINT>          | Yes  | false | []      | NONE  |
    | c_array_smallint | ARRAY<SMALLINT>         | Yes  | false | []      | NONE  |
    | c_array_int      | ARRAY<INT>              | Yes  | false | []      | NONE  |
    | c_array_bigint   | ARRAY<BIGINT>           | Yes  | false | []      | NONE  |
    | c_array_largeint | ARRAY<LARGEINT>         | Yes  | false | []      | NONE  |
    | c_array_float    | ARRAY<FLOAT>            | Yes  | false | []      | NONE  |
    | c_array_double   | ARRAY<DOUBLE>           | Yes  | false | []      | NONE  |
    | c_array_decimal  | ARRAY<DECIMALV3(10, 5)> | Yes  | false | []      | NONE  |
    | c_array_date     | ARRAY<DATEV2>           | Yes  | false | []      | NONE  |
    | c_array_datetime | ARRAY<DATETIMEV2(6)>    | Yes  | false | []      | NONE  |
    | c_array_char     | ARRAY<CHAR(10)>         | Yes  | false | []      | NONE  |
    | c_array_varchar  | ARRAY<VARCHAR(10)>      | Yes  | false | []      | NONE  |
    | c_array_string   | ARRAY<TEXT>             | Yes  | false | []      | NONE  |
    +------------------+-------------------------+------+-------+---------+-------+

     */

    val schema = new StructType()
      .add("id", IntegerType)
      .add("c_array_boolean", ArrayType(BooleanType))
      .add("c_array_tinyint", ArrayType(ByteType))
      .add("c_array_smallint", ArrayType(ShortType))
      .add("c_array_int", ArrayType(IntegerType))
      .add("c_array_bigint", ArrayType(LongType))
      .add("c_array_largeint", ArrayType(StringType))
      .add("c_array_float", ArrayType(FloatType))
      .add("c_array_double", ArrayType(DoubleType))
      .add("c_array_decimal", ArrayType(DecimalType.apply(10, 5)))
      .add("c_array_date", ArrayType(DateType))
      .add("c_array_datetime", ArrayType(TimestampType))
      .add("c_array_char", ArrayType(StringType))
      .add("c_array_varchar", ArrayType(StringType))
      .add("c_array_string", ArrayType(StringType))

    val row = Row(
      1,
      Array(true, false, false, true, true),
      Array(1.toByte, 2.toByte, 3.toByte),
      Array(2.toShort, 12.toShort, 32.toShort),
      Array(3, 4, 5, 6),
      Array(4.toLong, 5.toLong, 6.toLong),
      Array("123456789", "987654321", "123789456"),
      Array(6.6.floatValue(), 6.7.floatValue(), 7.8.floatValue()),
      Array(7.7.doubleValue(), 8.8.doubleValue(), 8.9.floatValue()),
      Array(Decimal.apply(3.12), Decimal.apply(1.12345)),
      Array(Date.valueOf("2023-09-08"), Date.valueOf("2027-10-28")),
      Array(Timestamp.valueOf("2023-09-08 17:12:34.123456"), Timestamp.valueOf("2024-09-08 18:12:34.123456")),
      Array("char", "char2"),
      Array("varchar", "varchar2"),
      Array("string", "string2")
    )


    val inputList = ListBuffer[Row]()
    for (a <- 0 until 7) {
      inputList.append(row)
    }

    val rdd = spark.sparkContext.parallelize(inputList, 1)
    val df = spark.createDataFrame(rdd, schema).toDF()

    df.write
      .format("doris")
      .option("doris.fenodes", dorisFeNodes)
      .option("user", dorisUser)
      .option("password", dorisPwd)
      .option("doris.table.identifier", s"$databaseName.spark_connector_array")
      .option("doris.sink.batch.size", 30)
      .option("doris.sink.properties.format", "arrow")
      .option("doris.sink.max-retries", 0)
      .save()
  }