regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy (951 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig
suite("test_routine_load_doc_case","p0") {
def kafkaCsvTopics = [
"test_rl_csv",
"test_rl_max_filter_ratio",
"test_rl_partition",
"test_rl_delete",
"test_rl_column_mapping",
"test_rl_hll"
]
def kafkaJsonTopics = [
"test_rl_json",
"test_rl_json_path",
"test_rl_json_root",
"test_rl_array",
"test_rl_map",
"test_rl_bitmap"
]
def jsonpaths = [
'[\"$.id\",\"$.name\",\"$.age\"]',
'[\"$.name\",\"$.id\",\"$.num\",\"$.age\"]',
]
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// define kafka
def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Create kafka producer
def producer = new KafkaProducer<>(props)
for (String kafkaCsvTopic in kafkaCsvTopics) {
def txt = new File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record)
}
}
for (String kafkaJsonTopic in kafkaJsonTopics) {
def kafkaJson = new File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaJsonTopic}.json""").text
def lines = kafkaJson.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaJsonTopic, null, line)
producer.send(record)
}
}
// case1: load csv
def tableName = "test_routine_load_doc_case"
def jobName = "test_routine_load_doc_case_job"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql1 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case2: load json
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS(user_id,name,age)
PROPERTIES(
"format"="json",
"jsonpaths"='${jsonpaths[0]}'
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case3: alter routine load
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "pause routine load for ${jobName}"
sql """
ALTER ROUTINE LOAD FOR ${jobName}
PROPERTIES(
"desired_concurrent_number" = "3"
)
FROM KAFKA(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "test-topic"
);
"""
sql "stop routine load for ${jobName}"
//case4: max_filter_ratio
def tableName1 = "test_routine_load_doc_case1"
sql """ DROP TABLE IF EXISTS ${tableName1} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName1} (
id INT NOT NULL COMMENT "User ID",
name VARCHAR(30) NOT NULL COMMENT "Name",
age INT COMMENT "Age"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName1}
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_filter_ratio"="0.5",
"max_error_number" = "100",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[1]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName1}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
log.info("url: ${state[0][18].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql4 "select * from ${tableName1} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName1} """
}
//case5: kafka_offsets
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"kafka_partitions" = "0",
"kafka_offsets" = "3"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql5 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case6: group.id and client.id
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.group.id" = "kafka_job03",
"property.client.id" = "kafka_client_03",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql6 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case7: filter
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
WHERE user_id >= 3
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql7 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case8: Loading specified partition data
def tableName2 = "test_routine_load_doc_case2"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (
id INT NOT NULL COMMENT "User ID",
name VARCHAR(30) NOT NULL COMMENT "Name",
age INT COMMENT "Age",
date DATETIME COMMENT "Date"
)
DUPLICATE KEY(`id`)
PARTITION BY RANGE(`id`)
(PARTITION partition_a VALUES [("0"), ("1")),
PARTITION partition_b VALUES [("1"), ("2")),
PARTITION partition_c VALUES [("2"), ("3")))
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
COLUMNS TERMINATED BY ",",
PARTITION(partition_b)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[2]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName2}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql8 "select * from ${tableName2} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName2} """
}
// case9: timezone
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
COLUMNS TERMINATED BY ","
PROPERTIES
(
"timezone" = "Asia/Shanghai"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[2]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName2}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql9 "select * from ${tableName2} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName2} """
}
// case10: merge delete
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
sql "stop routine load for ${jobName}"
sql "sync"
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
WITH DELETE
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[3]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql10 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case11: delete on
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
WITH MERGE
COLUMNS TERMINATED BY ",",
DELETE ON user_id = 2
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql11 "select * from ${tableName} order by user_id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case12: Load with column mapping and derived column calculation
tableName = "test_routine_load_doc_case3"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age",
num INT COMMENT "number"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age, num=age*10)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[4]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql12 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case13: test json
tableName = "routine_test12"
jobName = "kafka_job12"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql13 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case14: json path
tableName = "routine_test13"
jobName = "kafka_job13"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age",
num INT COMMENT "num"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS(name, id, num, age)
PROPERTIES
(
"format" = "json",
"jsonpaths"='${jsonpaths[1]}'
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[1]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql14 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case15: json root
tableName = "routine_test14"
jobName = "kafka_job14"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
PROPERTIES
(
"format" = "json",
"json_root" = "\$.source"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[2]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql15 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case16: array
tableName = "routine_test16"
jobName = "kafka_job16"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age",
array ARRAY<int(11)> NULL COMMENT "test array column"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[3]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql16 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case17: map
tableName = "routine_test17"
jobName = "kafka_job17"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age",
map Map<STRING, INT> NULL COMMENT "test column"
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[4]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql17 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case18: bitmap
tableName = "routine_test18"
jobName = "kafka_job18"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL COMMENT "id",
name VARCHAR(30) NOT NULL COMMENT "name",
age INT COMMENT "age",
bitmap_id INT COMMENT "test",
device_id BITMAP BITMAP_UNION COMMENT "test column"
)
AGGREGATE KEY (`id`,`name`,`age`,`bitmap_id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS(id, name, age, bitmap_id, device_id=to_bitmap(bitmap_id))
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaJsonTopics[4]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql18 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
// case19: hll
tableName = "routine_test19"
jobName = "kafka_job19"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
dt DATE,
id INT,
name VARCHAR(10),
province VARCHAR(10),
os VARCHAR(10),
pv hll hll_union
)
Aggregate KEY (dt,id,name,province,os)
distributed by hash(id) buckets 10
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
try {
sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS(dt, id, name, province, os, pv=hll_hash(id))
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[5]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
log.info("url: ${state[0][18].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql19 "select * from ${tableName} order by id"
} finally {
sql "stop routine load for ${jobName}"
sql """ truncate table ${tableName} """
}
//case 19: Single-task Loading to Multiple Tables
try {
sql """
CREATE ROUTINE LOAD ${jobName}
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[5]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
} finally {
sql "stop routine load for ${jobName}"
}
//case20: strict mode
try {
sql """
CREATE ROUTINE LOAD ${jobName}
PROPERTIES
(
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTopics[5]}"
);
"""
} finally {
sql "stop routine load for ${jobName}"
}
}
}