in hologres-connector-examples/hologres-connector-flink-examples/src/main/java/com/alibaba/ververica/connectors/hologres/example/FlinkRoaringBitmapAggJob.java [46:264]
public static void main(String[] args) throws Exception {
// 从执行命令中获取配置文件及数据源文件路径
Options options = new Options();
options.addOption("p", "propsFilePath", true, "Properties file path");
options.addOption("s", "sourceFilePath", true, "DataSource file path");
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(options, args);
String propsPath = commandLine.getOptionValue("propsFilePath");
String filePath = commandLine.getOptionValue("sourceFilePath");
// 从配置文件中读取参数
Properties props = new Properties();
props.load(new java.io.FileInputStream(propsPath));
// 参数,包括维表dim与结果表dws的名称
String endpoint = props.getProperty("endpoint");
String username = props.getProperty("username");
String password = props.getProperty("password");
String database = props.getProperty("database");
String dimTableName = props.getProperty("dimTableName");
String dwsTableName = props.getProperty("dwsTableName");
int windowSize = Integer.parseInt(props.getProperty("windowSize"));
// flink env设置
EnvironmentSettings.Builder settingsBuilder =
EnvironmentSettings.newInstance().inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, settingsBuilder.build());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// fieldsMask 为感兴趣的fields序号,fields类型在如下TupleTypeInfo中设置,其名称如注释所示
// 此处使用csv文件作为数据源,也可以是kafka等;csv文件参考src/main/resources/ods_app_example.csv
int[] fieldsMask = {0, 1, 2, 3, 8, 12};
TupleTypeInfo typeInfo =
new TupleTypeInfo<>(
Types.STRING, // uid
Types.STRING, // country
Types.STRING, // prov
Types.STRING, // city
TIMESTAMP, // click_time
Types.STRING // ymd
);
TupleCsvInputFormat csvInput = csvInputFormatBuilder(filePath, fieldsMask, typeInfo);
// createInput函数需要typeInfo设置输出格式
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 将DataStream转化为table,
// 与维表join需要proctime,详见https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("click_time"),
$("ymd"),
$("proctime").proctime());
// 注册到catalog环境
tableEnv.createTemporaryView("odsTable", odsTable);
// 创建hologres维表,其中insertifnotexists表示查询不到则自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.click_time, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
// 将join的结果转换为DataStream进行计算
DataStream<Row> source =
tableEnv.toDataStream(joinRes);
// 使用RoaringBitmap进行去重处理
SingleOutputStreamOperator<Row> processedSource = source
// 数据源中的click_time字段当做EventTime,注册watermarks.
// 实际使用中,event_time期望是基本有序的,比如上游数据源是kafka或者hologres binlog表,否则建议使用ProcessingTimeWindow
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
@Override
public long extractTimestamp(Row element) {
// 将LocalDateTime转换为毫秒时间戳
LocalDateTime localDateTime = LocalDateTime.parse(element.getField(3).toString());
return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}
})
// 筛选需要统计的维度(country, prov, city, ymd)
.keyBy(new KeySelector<Row, Row>() {
@Override
public Row getKey(Row row) throws Exception {
return Row.of(row.getField(0), row.getField(1), row.getField(2), row.getField(4));
}
})
// 滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
// 触发器,可以在窗口未结束时获取聚合结果
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(Math.max(1, windowSize / 20))))
// 允许延迟半个窗口的时间
.allowedLateness(Time.seconds(Math.max(1, windowSize / 2)))
// 窗口聚合函数,根据keyBy筛选的维度,进行聚合
.aggregate(
new AggregateFunction<
Row,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Row value,
RoaringBitmap acc) {
// 将32位的uid添加到RoaringBitmap进行去重
acc.add((Integer) value.getField(5));
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(
RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
new WindowFunction<
RoaringBitmap,
Row,
Row,
TimeWindow>() {
@Override
public void apply(
Row keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<Row> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// 优化RoaringBitmap
result.runOptimize();
// 将RoaringBitmap转化为字节数组以存入Holo中
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Timestamp字段表示以窗口长度为周期进行统计
out.collect(
Row.of(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(
timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
}).returns(new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING, Types.STRING, TIMESTAMP, Types.PRIMITIVE_ARRAY(Types.BYTE)));
// 计算结果转换为表
Table resTable =
tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("event_window_time"),
$("uid32_bitmap"));
// 创建holo结果表, 其中holo的roaringbitmap类型通过byte数组存入
String createHologresTable =
String.format(
"create table sink("
+ " country string,"
+ " prov string,"
+ " city string,"
+ " ymd string,"
+ " event_window_time timestamp,"
+ " uid32_bitmap BYTES"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'"
+ ")",
database, dwsTableName, username, password, endpoint);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);
}