private boolean addOrUpdateFileToDeltaLog()

in connectors/rocketmq-connect-deltalake/src/main/java/org/apache/rocketmq/connect/deltalake/writer/DeltalakeWriterOnHdfs.java [162:239]


    private boolean addOrUpdateFileToDeltaLog(WriteParquetResult result, boolean updateMeta, boolean update) {
        String tablePath = result.getTableDir();
        String addFileName = result.getFullFileName();
        long fileSize = result.getFileSize();
        try {
            final String engineInfo = deltalakeConnectConfig.getEngineType();

            DeltaLog log = DeltaLog.forTable(new Configuration(), tablePath);

            // todo parse partition column
            List<String> partitionColumns = Arrays.asList("name");

//            StructType schema1 = new StructType()
//                    .add("name", new StringType())
//                    .add("age", new IntegerType());
//            System.out.println("schema : " + schema1.toJson());

            org.apache.avro.Schema schema1 = deltalakeConnectConfig.getSchema();
            StructField[] structFields = new StructField[schema1.getFields().size()];
            int i = 0;
            for (org.apache.avro.Schema.Field field : schema1.getFields()) {

                SchemaConverters.SchemaType schemaType = SchemaConverters.toSqlType(field.schema());
                System.out.println("schema type : " + schemaType);
                System.out.println("field schema : " + field.schema().getType());
                DataType sparkDataType = schemaType.dataType();
                System.out.println("sparkdatatype : " + sparkDataType.json());
                io.delta.standalone.types.DataType deltaDataType = io.delta.standalone.types.DataType.fromJson(sparkDataType.json());
                System.out.println("deltadatatpye : " + deltaDataType.toJson());
                System.out.println("field name : " + field.name());
                structFields[i++] = new StructField(field.name(), deltaDataType);
            }
            StructType structType = new StructType(structFields);
            StructType schema = structType;
            Metadata metadata = Metadata.builder()
                    .schema(schema)
                    // todo add partition column
//                    .partitionColumns(partitionColumns)
                    .build();

            // update schema
            OptimisticTransaction txn = log.startTransaction();
            if (updateMeta) {
                txn.updateMetadata(metadata);
            }

            // todo add partition column to new added file
//            Map<String, String> partitionValues = new HashMap<>();
//            partitionValues.put("name", "111");
//            partitionValues.put("name", "222");

            // exec AddFile action
            FileSystem fs = FileSystem.get(new URI(addFileName), new Configuration());
            FileStatus status = fs.getFileStatus(new Path(addFileName));
            System.out.println("filestatus : " + addFileName + ", length : " + status.getLen());
            AddFile addFile =
                    AddFile.builder(addFileName, new HashMap<>(), fileSize, System.currentTimeMillis(),
                            true)
                            .build();
            Operation op;
            if (!update) {
                op = new Operation(Operation.Name.WRITE);
            } else {
                op = new Operation(Operation.Name.UPDATE);
            }
            txn.commit(Collections.singletonList(addFile), op, engineInfo);
            lastUpdateAddFileInfo = System.currentTimeMillis();
            // check if file length over maxFileSize to rolling a new file
            if (status.getLen() > deltalakeConnectConfig.getMaxFileSize()) {
                currentWriter = null;
            }
            return true;
        } catch (Exception e) {
            log.error("exec AddFile exception,", e);
            e.printStackTrace();
        }
        return false;
    }