def testDataframeWritePrimitiveType()

in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [50:153]


  def testDataframeWritePrimitiveType(): Unit = {
    /*

  CREATE TABLE `spark_connector_primitive` (
        `id` int(11) NOT NULL,
        `c_bool` boolean NULL,
        `c_tinyint` tinyint NULL,
        `c_smallint` smallint NULL,
        `c_int` int NULL,
        `c_bigint` bigint NULL,
        `c_largeint` largeint NULL,
        `c_float` float NULL,
        `c_double` double NULL,
        `c_decimal` DECIMAL(10, 5) NULL,
        `c_date` date NULL,
        `c_datetime` datetime(6) NULL,
        `c_char` char(10) NULL,
        `c_varchar` varchar(10) NULL,
        `c_string` string NULL
      ) ENGINE=OLAP
      DUPLICATE KEY(`id`)
      COMMENT 'OLAP'
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 1"
      );

    +------------+----------------+------+-------+---------+-------+
    | Field      | Type           | Null | Key   | Default | Extra |
    +------------+----------------+------+-------+---------+-------+
    | id         | INT            | No   | true  | NULL    |       |
    | c_bool     | BOOLEAN        | Yes  | false | NULL    | NONE  |
    | c_tinyint  | TINYINT        | Yes  | false | NULL    | NONE  |
    | c_smallint | SMALLINT       | Yes  | false | NULL    | NONE  |
    | c_int      | INT            | Yes  | false | NULL    | NONE  |
    | c_bigint   | BIGINT         | Yes  | false | NULL    | NONE  |
    | c_largeint | LARGEINT       | Yes  | false | NULL    | NONE  |
    | c_float    | FLOAT          | Yes  | false | NULL    | NONE  |
    | c_double   | DOUBLE         | Yes  | false | NULL    | NONE  |
    | c_decimal  | DECIMAL(10, 5) | Yes  | false | NULL    | NONE  |
    | c_date     | DATE           | Yes  | false | NULL    | NONE  |
    | c_datetime | DATETIME(6)    | Yes  | false | NULL    | NONE  |
    | c_char     | CHAR(10)       | Yes  | false | NULL    | NONE  |
    | c_varchar  | VARCHAR(10)    | Yes  | false | NULL    | NONE  |
    | c_string   | TEXT           | Yes  | false | NULL    | NONE  |
    +------------+----------------+------+-------+---------+-------+

     */

    val schema = new StructType()
      .add("id", IntegerType)
      .add("c_bool", BooleanType)
      .add("c_tinyint", ByteType)
      .add("c_smallint", ShortType)
      .add("c_int", IntegerType)
      .add("c_bigint", LongType)
      .add("c_largeint", StringType)
      .add("c_float", FloatType)
      .add("c_double", DoubleType)
      .add("c_decimal", DecimalType.apply(10, 5))
      .add("c_date", DateType)
      .add("c_datetime", TimestampType)
      .add("c_char", StringType)
      .add("c_varchar", StringType)
      .add("c_string", StringType)

    val row = Row(
      1,
      true,
      1.toByte,
      2.toShort,
      3,
      4.toLong,
      "123456789",
      6.6.floatValue(),
      7.7.doubleValue(),
      Decimal.apply(3.12),
      Date.valueOf("2023-09-08"),
      Timestamp.valueOf("2023-09-08 17:12:34.123456"),
      "char",
      "varchar",
      "string"
    )


    val inputList = ListBuffer[Row]()
    for (a <- 0 until 7) {
      inputList.append(row)
    }

    val rdd = spark.sparkContext.parallelize(inputList, 1)
    val df = spark.createDataFrame(rdd, schema).toDF()

    df.write
      .format("doris")
      .option("doris.fenodes", dorisFeNodes)
      .option("user", dorisUser)
      .option("password", dorisPwd)
      .option("doris.table.identifier", s"$databaseName.spark_connector_primitive")
      .option("doris.sink.batch.size", 3)
      .option("doris.sink.properties.format", "arrow")
      .option("doris.sink.max-retries", 0)
      .save()
  }