public Updater()

in connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java [81:141]


    public Updater(HudiConnectConfig hudiConnectConfig) throws Exception {
        this.hudiConnectConfig = hudiConnectConfig;

        try {
            File schemaFile = new File(hudiConnectConfig.getSchemaPath());
            this.hudiConnectConfig.schema = new Schema.Parser().parse(schemaFile);
            log.info("Hudi schema : " + this.hudiConnectConfig.schema.toString());
        } catch (IOException e) {
            throw new Exception(String.format("Failed to find schema file %s", hudiConnectConfig.getSchemaPath()), e);
        }
        Configuration hadoopConf = new Configuration();
        hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
        hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName());
        hadoopConf.setClassLoader(this.getClass().getClassLoader());
        hadoopConf.set("fs.hdfs.impl",
                DistributedFileSystem.class.getName()
        );
        hadoopConf.set("fs.file.impl",
                LocalFileSystem.class.getName()
        );

        // fs.%s.impl.disable.cache
        hadoopConf.set("fs.file.impl.disable.cache", String.valueOf(true));
        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

        Path path = new Path(hudiConnectConfig.getTablePath());
        FileSystem fs = FSUtils.getFs(hudiConnectConfig.getTablePath(), hadoopConf);
        if (!fs.exists(path)) {
            HoodieTableMetaClient.withPropertyBuilder()
                    .setTableType(hudiConnectConfig.getTableType())
                    .setTableName(hudiConnectConfig.getTableName())
                    .setPayloadClassName(HoodieAvroPayload.class.getName())
                    .initTable(hadoopConf, hudiConnectConfig.getTablePath());
        }
        log.info("Hudi inited table");

        this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
                .withSchema(this.hudiConnectConfig.schema.toString())
                .withEngineType(EngineType.JAVA)
                .withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism())
                .withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
        cfg.getAvroSchemaValidate();
        this.hudiWriteClient =
                new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg);
        log.info("Open HoodieJavaWriteClient successfully");

        inflightList = new ArrayList<>();
        if (batchSize > 0) {
            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            scheduledExecutor.scheduleAtFixedRate(
                () -> {
                    try {
                        commit();
                    } catch (Exception e) {
                        log.error("Flush error when executed at fixed rate", e);
                    }
                }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
        }
    }