in connectors/rocketmq-connect-deltalake/src/main/java/org/apache/rocketmq/connect/deltalake/writer/DeltalakeWriterOnHdfs.java [162:239]
private boolean addOrUpdateFileToDeltaLog(WriteParquetResult result, boolean updateMeta, boolean update) {
String tablePath = result.getTableDir();
String addFileName = result.getFullFileName();
long fileSize = result.getFileSize();
try {
final String engineInfo = deltalakeConnectConfig.getEngineType();
DeltaLog log = DeltaLog.forTable(new Configuration(), tablePath);
// todo parse partition column
List<String> partitionColumns = Arrays.asList("name");
// StructType schema1 = new StructType()
// .add("name", new StringType())
// .add("age", new IntegerType());
// System.out.println("schema : " + schema1.toJson());
org.apache.avro.Schema schema1 = deltalakeConnectConfig.getSchema();
StructField[] structFields = new StructField[schema1.getFields().size()];
int i = 0;
for (org.apache.avro.Schema.Field field : schema1.getFields()) {
SchemaConverters.SchemaType schemaType = SchemaConverters.toSqlType(field.schema());
System.out.println("schema type : " + schemaType);
System.out.println("field schema : " + field.schema().getType());
DataType sparkDataType = schemaType.dataType();
System.out.println("sparkdatatype : " + sparkDataType.json());
io.delta.standalone.types.DataType deltaDataType = io.delta.standalone.types.DataType.fromJson(sparkDataType.json());
System.out.println("deltadatatpye : " + deltaDataType.toJson());
System.out.println("field name : " + field.name());
structFields[i++] = new StructField(field.name(), deltaDataType);
}
StructType structType = new StructType(structFields);
StructType schema = structType;
Metadata metadata = Metadata.builder()
.schema(schema)
// todo add partition column
// .partitionColumns(partitionColumns)
.build();
// update schema
OptimisticTransaction txn = log.startTransaction();
if (updateMeta) {
txn.updateMetadata(metadata);
}
// todo add partition column to new added file
// Map<String, String> partitionValues = new HashMap<>();
// partitionValues.put("name", "111");
// partitionValues.put("name", "222");
// exec AddFile action
FileSystem fs = FileSystem.get(new URI(addFileName), new Configuration());
FileStatus status = fs.getFileStatus(new Path(addFileName));
System.out.println("filestatus : " + addFileName + ", length : " + status.getLen());
AddFile addFile =
AddFile.builder(addFileName, new HashMap<>(), fileSize, System.currentTimeMillis(),
true)
.build();
Operation op;
if (!update) {
op = new Operation(Operation.Name.WRITE);
} else {
op = new Operation(Operation.Name.UPDATE);
}
txn.commit(Collections.singletonList(addFile), op, engineInfo);
lastUpdateAddFileInfo = System.currentTimeMillis();
// check if file length over maxFileSize to rolling a new file
if (status.getLen() > deltalakeConnectConfig.getMaxFileSize()) {
currentWriter = null;
}
return true;
} catch (Exception e) {
log.error("exec AddFile exception,", e);
e.printStackTrace();
}
return false;
}