in regression-test/suites/doc/data-operate/import/import-way/broker-load-manual.md.groovy [37:422]
CREATE TABLE IF NOT EXISTS load_hdfs_file_test (
id INT,
age INT,
name STRING
) DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
if (enableHdfs()) {
logger.info("test hdfs load")
var base_path = uploadToHdfs("doc/data-operate/import/import-way/broker_load")
sql """
LOAD LABEL demo.label_20220402
(
DATA INFILE("${base_path}/test_hdfs.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY "\\t"
(id,age,name)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
"""
waitForBrokerLoadDone("label_20220402")
order_qt_sql "SELECT * FROM load_hdfs_file_test"
sql """
LOAD LABEL demo.label_20220402_2
(
DATA INFILE("${base_path}/test_hdfs.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY "\\t"
(id,age,name)
)
with HDFS
(
"hadoop.username" = "${getHdfsUser()}",
"fs.defaultFS"="${getHdfsFs()}",
"dfs.nameservices" = "hafs",
"dfs.ha.namenodes.hafs" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.hafs.my_namenode1" = "${getHdfsFs()}",
"dfs.namenode.rpc-address.hafs.my_namenode2" = "${getHdfsFs()}",
"dfs.client.failover.proxy.provider.hafs" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
"""
waitForBrokerLoadDone("label_20220402_2")
order_qt_sql "SELECT * FROM load_hdfs_file_test"
multi_sql """
CREATE DATABASE IF NOT EXISTS example_db;
USE example_db;
CLEAN LABEL FROM example_db;
DROP TABLE IF EXISTS my_table1;
CREATE TABLE IF NOT EXISTS my_table1 (
k1 INT,
k2 INT,
k3 INT
) PARTITION BY RANGE(k1) (
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (20),
PARTITION p3 VALUES LESS THAN (30)
) DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
DROP TABLE IF EXISTS my_table2;
CREATE TABLE IF NOT EXISTS my_table2 LIKE my_table1;
"""
sql """
LOAD LABEL example_db.label2
(
DATA INFILE("${base_path}/input/file-10*")
INTO TABLE `my_table1`
PARTITION (p1)
COLUMNS TERMINATED BY ","
(k1, tmp_k2, tmp_k3)
SET (
k2 = tmp_k2 + 1,
k3 = tmp_k3 + 1
),
DATA INFILE("${base_path}/input/file-20*")
INTO TABLE `my_table2`
COLUMNS TERMINATED BY ","
(k1, k2, k3)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label2")
order_qt_sql "SELECT * FROM my_table1"
multi_sql """
DROP TABLE IF EXISTS my_table;
CREATE TABLE IF NOT EXISTS my_table (
k1 INT,
k2 INT,
k3 INT
) PARTITION BY RANGE(k1) (
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (20),
PARTITION p3 VALUES LESS THAN (30)
) DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
LOAD LABEL example_db.label3
(
DATA INFILE("${base_path}/data/*/*")
INTO TABLE `my_table`
COLUMNS TERMINATED BY "\\\\x01"
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label3")
order_qt_sql "SELECT * FROM my_table"
multi_sql """
DROP TABLE IF EXISTS my_table;
CREATE TABLE IF NOT EXISTS my_table (
k1 INT,
k2 INT,
k3 INT,
city STRING,
utc_date date
) PARTITION BY RANGE(k1) (
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (20),
PARTITION p3 VALUES LESS THAN (30)
) DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
LOAD LABEL example_db.label4
(
DATA INFILE("${base_path}/input/file.parquet")
INTO TABLE `my_table`
FORMAT AS "parquet"
(k1, k2, k3)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label4")
order_qt_sql "SELECT * FROM my_table"
sql """
LOAD LABEL example_db.label5
(
DATA INFILE("${base_path}/input/city=beijing/*/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (city, utc_date)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label5")
order_qt_sql "SELECT * FROM my_table"
sql """
LOAD LABEL example_db.label6
(
DATA INFILE("${base_path}/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
SET (
k2 = k2 + 1
)
PRECEDING FILTER k1 = 1
WHERE k1 > k2
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label6")
order_qt_sql "SELECT * FROM my_table"
multi_sql """
DROP TABLE IF EXISTS tbl12;
CREATE TABLE IF NOT EXISTS tbl12 (
data_time DATETIME,
k2 INT,
k3 INT
) DISTRIBUTED BY HASH(data_time) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
LOAD LABEL example_db.label7
(
DATA INFILE("${base_path}/data2/*/test.txt")
INTO TABLE `tbl12`
COLUMNS TERMINATED BY ","
(k2,k3)
COLUMNS FROM PATH AS (data_time)
SET (
data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label7")
order_qt_sql "SELECT * FROM tbl12"
multi_sql """
DROP TABLE IF EXISTS my_table;
CREATE TABLE IF NOT EXISTS my_table (
k1 INT,
k2 INT,
k3 INT,
v1 INT,
v2 INT
) UNIQUE KEY (k1, k2, k3)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
INSERT INTO my_table VALUES
(0, 0, 0, 0, 0),
(1, 1, 1, 100, 100),
(2, 2, 2, 200, 200),
(3, 3, 3, 300, 300);
"""
sql """
LOAD LABEL example_db.label8
(
MERGE DATA INFILE("${base_path}/data3/data.csv")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
"""
waitForBrokerLoadDone("label8")
order_qt_sql "SELECT * FROM my_table"
multi_sql """
DROP TABLE IF EXISTS my_table;
CREATE TABLE IF NOT EXISTS my_table (
k1 INT,
k2 INT,
source_sequence INT,
v1 INT,
v2 INT
) UNIQUE KEY (k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"function_column.sequence_col" = "source_sequence",
"replication_num" = "1"
);
INSERT INTO my_table VALUES
(0, 0, 0, 0, 0),
(1, 1, 1, 100, 100),
(2, 2, 2, 200, 200),
(3, 3, 3, 300, 300);
"""
sql """
LOAD LABEL example_db.label9
(
DATA INFILE("${base_path}/test_sequence.csv")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label9")
order_qt_sql "SELECT * FROM my_table"
multi_sql """
DROP TABLE IF EXISTS my_table;
CREATE TABLE IF NOT EXISTS my_table (
id INT,
city STRING,
code INT
)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
LOAD LABEL example_db.label10
(
DATA INFILE("${base_path}/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
PROPERTIES(
"json_root" = "\$.item",
"jsonpaths" = "[\\"\$.id\\", \\"\$.city\\", \\"\$.code\\"]"
)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label10")
order_qt_sql "SELECT * FROM my_table"
sql """
LOAD LABEL example_db.label10_2
(
DATA INFILE("${base_path}/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
(id, code, city)
SET (id = id * 10)
PROPERTIES(
"json_root" = "\$.item",
"jsonpaths" = "[\\"\$.id\\", \\"\$.city\\", \\"\$.code\\"]"
)
)
with HDFS
(
"fs.defaultFS"="${getHdfsFs()}",
"hadoop.username" = "${getHdfsUser()}",
"hadoop.password" = "${getHdfsPasswd()}"
);
"""
waitForBrokerLoadDone("label10_2")
order_qt_sql "SELECT * FROM my_table"
}