in HudiConnector/src/main/java/basic/application/StreamingJob.java [89:135]
public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties) {
org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
Configuration configuration = streamTableEnvironment.getConfig().getConfiguration();
configuration.setString("execution.checkpointing.interval", "1 min");
final String createTableStmt = "CREATE TABLE IF NOT EXISTS CustomerTable (\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format\n" +
" `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format\n" +
" `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n" +
" `CUST_ID` BIGINT,\n" +
" `NAME` STRING,\n" +
" `MKTSEGMENT` STRING,\n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '"+ kafkaTopic +"',\n" +
" 'properties.bootstrap.servers' = '"+ kafkaProperties.get("bootstrap.servers") +"',\n" +
" 'properties.group.id' = 'kdaConsumerGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'value.format' = 'debezium-json'\n" +
")";
final String s3Sink = "CREATE TABLE IF NOT EXISTS `customer_hudi` (\n" +
" ts TIMESTAMP(3),\n" +
" customer_id BIGINT,\n" +
" name STRING,\n" +
" mktsegment STRING,\n" +
" PRIMARY KEY (`customer_id`) NOT Enforced\n" +
")\n" +
"PARTITIONED BY (`mktsegment`)\n" +
"WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'read.streaming.enabled' = 'true',\n" +
" 'write.tasks' = '4',\n" +
" 'path' = '" + s3Path + "',\n" +
" 'hoodie.datasource.query.type' = 'snapshot',\n" +
" 'table.type' = 'MERGE_ON_READ' -- MERGE_ON_READ table or, by default is COPY_ON_WRITE\n" +
")";
streamTableEnvironment.executeSql(createTableStmt);
streamTableEnvironment.executeSql(s3Sink);
final String insertSql = "insert into customer_hudi select event_time, CUST_ID, NAME , MKTSEGMENT from CustomerTable";
streamTableEnvironment.executeSql(insertSql);
}