def testDataframeWriteStructType()

in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [365:460]


  def testDataframeWriteStructType(): Unit = {
    /*

CREATE TABLE `spark_connector_struct` (
          `id` int NOT NULL,
          `st` STRUCT<
              `c_bool`:boolean,
              `c_tinyint`:tinyint(4),
              `c_smallint`:smallint(6),
              `c_int`:int(11),
              `c_bigint`:bigint(20),
              `c_largeint`:largeint(40),
              `c_float`:float,
              `c_double`:double,
              `c_decimal`:DECIMAL(10, 5),
              `c_date`:date,
              `c_datetime`:datetime(6),
              `c_char`:char(10),
              `c_varchar`:varchar(10),
              `c_string`:string
            > NULL
        ) ENGINE=OLAP
        DUPLICATE KEY(`id`)
        COMMENT 'OLAP'
        DISTRIBUTED BY HASH(`id`) BUCKETS 1
        PROPERTIES (
        "replication_allocation" = "tag.location.default: 1"
        );

    +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+
    | Field | Type                                                                                                                                                                                                                                                           | Null | Key   | Default | Extra |
    +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+
    | id    | INT                                                                                                                                                                                                                                                            | No   | true  | NULL    |       |
    | st    | STRUCT<c_bool:BOOLEAN,c_tinyint:TINYINT,c_smallint:SMALLINT,c_int:INT,c_bigint:BIGINT,c_largeint:LARGEINT,c_float:FLOAT,c_double:DOUBLE,c_decimal:DECIMALV3(10, 5),c_date:DATEV2,c_datetime:DATETIMEV2(6),c_char:CHAR(10),c_varchar:VARCHAR(10),c_string:TEXT> | Yes  | false | NULL    | NONE  |
    +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+---------+-------+

     */

    val st = new StructType()
      .add("c_bool", BooleanType)
      .add("c_tinyint", ByteType)
      .add("c_smallint", ShortType)
      .add("c_int", IntegerType)
      .add("c_bigint", LongType)
      .add("c_largeint", StringType)
      .add("c_float", FloatType)
      .add("c_double", DoubleType)
      .add("c_decimal", DecimalType.apply(10, 5))
      .add("c_date", DateType)
      .add("c_datetime", TimestampType)
      .add("c_char", StringType)
      .add("c_varchar", StringType)
      .add("c_string", StringType)

    val schema = new StructType()
      .add("id", IntegerType)
      .add("st", st)

    val row = Row(
      1,
      Row(true,
        1.toByte,
        2.toShort,
        3,
        4.toLong,
        "123456789",
        6.6.floatValue(),
        7.7.doubleValue(),
        Decimal.apply(3.12),
        Date.valueOf("2023-09-08"),
        Timestamp.valueOf("2023-09-08 17:12:34.123456"),
        "char",
        "varchar",
        "string")
    )


    val inputList = ListBuffer[Row]()
    for (a <- 0 until 7) {
      inputList.append(row)
    }

    val rdd = spark.sparkContext.parallelize(inputList, 1)
    val df = spark.createDataFrame(rdd, schema).toDF()

    df.write
      .format("doris")
      .option("doris.fenodes", dorisFeNodes)
      .option("user", dorisUser)
      .option("password", dorisPwd)
      .option("doris.table.identifier", s"$databaseName.spark_connector_struct")
      .option("doris.sink.batch.size", 3)
      .option("doris.sink.properties.format", "arrow")
      .option("doris.sink.max-retries", 0)
      .save()
  }