def testDataframeWriteMapType()

in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [260:363]


  def testDataframeWriteMapType(): Unit = {
    /*

  CREATE TABLE `spark_connector_map` (
        `id` int(11) NOT NULL,
        `c_map_bool` Map<boolean,boolean> NULL,
        `c_map_tinyint` Map<tinyint,tinyint> NULL,
        `c_map_smallint` Map<smallint,smallint> NULL,
        `c_map_int` Map<int,int> NULL,
        `c_map_bigint` Map<bigint,bigint> NULL,
        `c_map_largeint` Map<largeint,largeint> NULL,
        `c_map_float` Map<float,float> NULL,
        `c_map_double` Map<double,double> NULL,
        `c_map_decimal` Map<DECIMAL(10, 5),DECIMAL(10, 5)> NULL,
        `c_map_date` Map<date,date> NULL,
        `c_map_datetime` Map<datetime(6),datetime(6)> NULL,
        `c_map_char` Map<char(10),char(10)> NULL,
        `c_map_varchar` Map<varchar(10),varchar(10)> NULL,
        `c_map_string` Map<string,string> NULL
      ) ENGINE=OLAP
      DUPLICATE KEY(`id`)
      COMMENT 'OLAP'
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 1"
      );

    +----------------+----------------------------------------+------+-------+---------+-------+
    | Field          | Type                                   | Null | Key   | Default | Extra |
    +----------------+----------------------------------------+------+-------+---------+-------+
    | id             | INT                                    | No   | true  | NULL    |       |
    | c_map_bool     | MAP<BOOLEAN,BOOLEAN>                   | Yes  | false | NULL    | NONE  |
    | c_map_tinyint  | MAP<TINYINT,TINYINT>                   | Yes  | false | NULL    | NONE  |
    | c_map_smallint | MAP<SMALLINT,SMALLINT>                 | Yes  | false | NULL    | NONE  |
    | c_map_int      | MAP<INT,INT>                           | Yes  | false | NULL    | NONE  |
    | c_map_bigint   | MAP<BIGINT,BIGINT>                     | Yes  | false | NULL    | NONE  |
    | c_map_largeint | MAP<LARGEINT,LARGEINT>                 | Yes  | false | NULL    | NONE  |
    | c_map_float    | MAP<FLOAT,FLOAT>                       | Yes  | false | NULL    | NONE  |
    | c_map_double   | MAP<DOUBLE,DOUBLE>                     | Yes  | false | NULL    | NONE  |
    | c_map_decimal  | MAP<DECIMALV3(10, 5),DECIMALV3(10, 5)> | Yes  | false | NULL    | NONE  |
    | c_map_date     | MAP<DATEV2,DATEV2>                     | Yes  | false | NULL    | NONE  |
    | c_map_datetime | MAP<DATETIMEV2(6),DATETIMEV2(6)>       | Yes  | false | NULL    | NONE  |
    | c_map_char     | MAP<CHAR(10),CHAR(10)>                 | Yes  | false | NULL    | NONE  |
    | c_map_varchar  | MAP<VARCHAR(10),VARCHAR(10)>           | Yes  | false | NULL    | NONE  |
    | c_map_string   | MAP<TEXT,TEXT>                         | Yes  | false | NULL    | NONE  |
    +----------------+----------------------------------------+------+-------+---------+-------+

     */

    val schema = new StructType()
      .add("id", IntegerType)
      .add("c_map_bool", MapType(BooleanType, BooleanType))
      .add("c_map_tinyint", MapType(ByteType, ByteType))
      .add("c_map_smallint", MapType(ShortType, ShortType))
      .add("c_map_int", MapType(IntegerType, IntegerType))
      .add("c_map_bigint", MapType(LongType, LongType))
      .add("c_map_largeint", MapType(StringType, StringType))
      .add("c_map_float", MapType(FloatType, FloatType))
      .add("c_map_double", MapType(DoubleType, DoubleType))
      .add("c_map_decimal", MapType(DecimalType.apply(10, 5), DecimalType.apply(10, 5)))
      .add("c_map_date", MapType(DateType, DateType))
      .add("c_map_datetime", MapType(TimestampType, TimestampType))
      .add("c_map_char", MapType(StringType, StringType))
      .add("c_map_varchar", MapType(StringType, StringType))
      .add("c_map_string", MapType(StringType, StringType))

    val row = Row(
      1,
      Map(true -> false, false -> true, true -> true),
      Map(1.toByte -> 2.toByte, 3.toByte -> 4.toByte),
      Map(2.toShort -> 4.toShort, 5.toShort -> 6.toShort),
      Map(3 -> 4, 7 -> 8),
      Map(4.toLong -> 5.toLong, 1.toLong -> 2.toLong),
      Map("123456789" -> "987654321", "789456123" -> "456789123"),
      Map(6.6.floatValue() -> 8.8.floatValue(), 9.9.floatValue() -> 10.1.floatValue()),
      Map(7.7.doubleValue() -> 1.1.doubleValue(), 2.2 -> 3.3.doubleValue()),
      Map(Decimal.apply(3.12) -> Decimal.apply(1.23), Decimal.apply(2.34) -> Decimal.apply(5.67)),
      Map(Date.valueOf("2023-09-08") -> Date.valueOf("2024-09-08"), Date.valueOf("1023-09-08") -> Date.valueOf("2023-09-08")),
      Map(Timestamp.valueOf("1023-09-08 17:12:34.123456") -> Timestamp.valueOf("2023-09-08 17:12:34.123456"), Timestamp.valueOf("3023-09-08 17:12:34.123456") -> Timestamp.valueOf("4023-09-08 17:12:34.123456")),
      Map("char" -> "char2", "char2" -> "char3"),
      Map("varchar" -> "varchar2", "varchar3" -> "varchar4"),
      Map("string" -> "string2", "string3" -> "string4")
    )


    val inputList = ListBuffer[Row]()
    for (a <- 0 until 7) {
      inputList.append(row)
    }

    val rdd = spark.sparkContext.parallelize(inputList, 1)
    val df = spark.createDataFrame(rdd, schema).toDF()

    df.write
      .format("doris")
      .option("doris.fenodes", dorisFeNodes)
      .option("user", dorisUser)
      .option("password", dorisPwd)
      .option("doris.table.identifier", s"$databaseName.spark_connector_map")
      .option("doris.sink.batch.size", 3)
      .option("doris.sink.properties.format", "arrow")
      .option("doris.sink.max-retries", 0)
      .save()
  }