in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/testcase/TestStreamLoadForArrowType.scala [50:153]
def testDataframeWritePrimitiveType(): Unit = {
/*
CREATE TABLE `spark_connector_primitive` (
`id` int(11) NOT NULL,
`c_bool` boolean NULL,
`c_tinyint` tinyint NULL,
`c_smallint` smallint NULL,
`c_int` int NULL,
`c_bigint` bigint NULL,
`c_largeint` largeint NULL,
`c_float` float NULL,
`c_double` double NULL,
`c_decimal` DECIMAL(10, 5) NULL,
`c_date` date NULL,
`c_datetime` datetime(6) NULL,
`c_char` char(10) NULL,
`c_varchar` varchar(10) NULL,
`c_string` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
+------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------------+------+-------+---------+-------+
| id | INT | No | true | NULL | |
| c_bool | BOOLEAN | Yes | false | NULL | NONE |
| c_tinyint | TINYINT | Yes | false | NULL | NONE |
| c_smallint | SMALLINT | Yes | false | NULL | NONE |
| c_int | INT | Yes | false | NULL | NONE |
| c_bigint | BIGINT | Yes | false | NULL | NONE |
| c_largeint | LARGEINT | Yes | false | NULL | NONE |
| c_float | FLOAT | Yes | false | NULL | NONE |
| c_double | DOUBLE | Yes | false | NULL | NONE |
| c_decimal | DECIMAL(10, 5) | Yes | false | NULL | NONE |
| c_date | DATE | Yes | false | NULL | NONE |
| c_datetime | DATETIME(6) | Yes | false | NULL | NONE |
| c_char | CHAR(10) | Yes | false | NULL | NONE |
| c_varchar | VARCHAR(10) | Yes | false | NULL | NONE |
| c_string | TEXT | Yes | false | NULL | NONE |
+------------+----------------+------+-------+---------+-------+
*/
val schema = new StructType()
.add("id", IntegerType)
.add("c_bool", BooleanType)
.add("c_tinyint", ByteType)
.add("c_smallint", ShortType)
.add("c_int", IntegerType)
.add("c_bigint", LongType)
.add("c_largeint", StringType)
.add("c_float", FloatType)
.add("c_double", DoubleType)
.add("c_decimal", DecimalType.apply(10, 5))
.add("c_date", DateType)
.add("c_datetime", TimestampType)
.add("c_char", StringType)
.add("c_varchar", StringType)
.add("c_string", StringType)
val row = Row(
1,
true,
1.toByte,
2.toShort,
3,
4.toLong,
"123456789",
6.6.floatValue(),
7.7.doubleValue(),
Decimal.apply(3.12),
Date.valueOf("2023-09-08"),
Timestamp.valueOf("2023-09-08 17:12:34.123456"),
"char",
"varchar",
"string"
)
val inputList = ListBuffer[Row]()
for (a <- 0 until 7) {
inputList.append(row)
}
val rdd = spark.sparkContext.parallelize(inputList, 1)
val df = spark.createDataFrame(rdd, schema).toDF()
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("doris.table.identifier", s"$databaseName.spark_connector_primitive")
.option("doris.sink.batch.size", 3)
.option("doris.sink.properties.format", "arrow")
.option("doris.sink.max-retries", 0)
.save()
}