public static void process()

in HudiConnector/src/main/java/basic/application/StreamingJob.java [89:135]


		public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties)  {
			org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
					env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

			Configuration configuration = streamTableEnvironment.getConfig().getConfiguration();
			configuration.setString("execution.checkpointing.interval", "1 min");

			final String createTableStmt = "CREATE TABLE IF NOT EXISTS CustomerTable (\n" +
					"  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format\n" +
					"  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format\n" +
					"  `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n" +
					"  `CUST_ID` BIGINT,\n" +
					"  `NAME` STRING,\n" +
					"  `MKTSEGMENT` STRING,\n" +
					"   WATERMARK FOR event_time AS event_time\n" +
					") WITH (\n" +
					"  'connector' = 'kafka',\n" +
					"  'topic' = '"+ kafkaTopic +"',\n" +
					"  'properties.bootstrap.servers' = '"+  kafkaProperties.get("bootstrap.servers") +"',\n" +
					"  'properties.group.id' = 'kdaConsumerGroup',\n" +
					"  'scan.startup.mode' = 'earliest-offset',\n" +
					"  'value.format' = 'debezium-json'\n" +
					")";

			final String s3Sink = "CREATE TABLE IF NOT EXISTS `customer_hudi` (\n" +
					"  ts TIMESTAMP(3),\n" +
					"  customer_id BIGINT,\n" +
					"  name STRING,\n" +
					"  mktsegment STRING,\n" +
					"  PRIMARY KEY (`customer_id`) NOT Enforced\n" +
					")\n" +
					"PARTITIONED BY (`mktsegment`)\n" +
					"WITH (\n" +
					"  'connector' = 'hudi',\n" +
					"  'read.streaming.enabled' = 'true',\n" +
					"  'write.tasks' = '4',\n" +
					"  'path' = '" + s3Path + "',\n" +
					"  'hoodie.datasource.query.type' = 'snapshot',\n" +
					"  'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE\n" +
					")";

			streamTableEnvironment.executeSql(createTableStmt);
			streamTableEnvironment.executeSql(s3Sink);

			final String insertSql = "insert into customer_hudi select event_time, CUST_ID,  NAME , MKTSEGMENT from CustomerTable";
			streamTableEnvironment.executeSql(insertSql);
		}