in connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java [81:141]
public Updater(HudiConnectConfig hudiConnectConfig) throws Exception {
this.hudiConnectConfig = hudiConnectConfig;
try {
File schemaFile = new File(hudiConnectConfig.getSchemaPath());
this.hudiConnectConfig.schema = new Schema.Parser().parse(schemaFile);
log.info("Hudi schema : " + this.hudiConnectConfig.schema.toString());
} catch (IOException e) {
throw new Exception(String.format("Failed to find schema file %s", hudiConnectConfig.getSchemaPath()), e);
}
Configuration hadoopConf = new Configuration();
hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName());
hadoopConf.setClassLoader(this.getClass().getClassLoader());
hadoopConf.set("fs.hdfs.impl",
DistributedFileSystem.class.getName()
);
hadoopConf.set("fs.file.impl",
LocalFileSystem.class.getName()
);
// fs.%s.impl.disable.cache
hadoopConf.set("fs.file.impl.disable.cache", String.valueOf(true));
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Path path = new Path(hudiConnectConfig.getTablePath());
FileSystem fs = FSUtils.getFs(hudiConnectConfig.getTablePath(), hadoopConf);
if (!fs.exists(path)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(hudiConnectConfig.getTableType())
.setTableName(hudiConnectConfig.getTableName())
.setPayloadClassName(HoodieAvroPayload.class.getName())
.initTable(hadoopConf, hudiConnectConfig.getTablePath());
}
log.info("Hudi inited table");
this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
.withSchema(this.hudiConnectConfig.schema.toString())
.withEngineType(EngineType.JAVA)
.withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism())
.withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
cfg.getAvroSchemaValidate();
this.hudiWriteClient =
new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg);
log.info("Open HoodieJavaWriteClient successfully");
inflightList = new ArrayList<>();
if (batchSize > 0) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.scheduleAtFixedRate(
() -> {
try {
commit();
} catch (Exception e) {
log.error("Flush error when executed at fixed rate", e);
}
}, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
}