in java/src/main/java/com/google/cloud/dataproc/templates/util/Dataplex/DataplexEntityUtil.java [276:323]
public List<StructField> buildSparkSchemaFromDataplexSchema(
List<StructField> schema,
HashMap<String, DataType> dataplexTypeToSparkType,
JsonArray dataplexSchema) {
Iterator fieldsIterator = dataplexSchema.iterator();
while (fieldsIterator.hasNext()) {
JsonObject field = (JsonObject) fieldsIterator.next();
String type = field.get(ENTITY_SCHEMA_FIELD_TYPE_PROP_KEY).getAsString();
String name = field.get(PARTITION_FIELD_NAME_PROP_KEY).getAsString();
String mode = ENTITY_SCHEMA_FIELD_MODE_UNSPECIFIED;
if (field.get(ENTITY_SCHEMA_FIELD_MODE_PROP_KEY) != null) {
mode = field.get(ENTITY_SCHEMA_FIELD_MODE_PROP_KEY).getAsString();
}
if (type.equals(ENTITY_SCHEMA_TYPE_MODE_RECORD)
&& mode.equals(ENTITY_SCHEMA_FIELD_MODE_REPEATED)) {
List<StructField> structFieldList = new ArrayList<>();
JsonArray nestedField = field.getAsJsonArray(ENTITY_SCHEMA_FIELDS_PROP_NAME);
structFieldList =
buildSparkSchemaFromDataplexSchema(
structFieldList, dataplexTypeToSparkType, nestedField);
StructField newField =
DataTypes.createStructField(
name, DataTypes.createArrayType(DataTypes.createStructType(structFieldList)), true);
schema.add(newField);
} else if (type.equals(ENTITY_SCHEMA_TYPE_MODE_RECORD)) {
List<StructField> structFieldList = new ArrayList<>();
JsonArray nestedField = field.getAsJsonArray(ENTITY_SCHEMA_FIELDS_PROP_NAME);
structFieldList =
buildSparkSchemaFromDataplexSchema(
structFieldList, dataplexTypeToSparkType, nestedField);
StructField newField =
DataTypes.createStructField(name, DataTypes.createStructType(structFieldList), true);
schema.add(newField);
} else if (mode.equals(ENTITY_SCHEMA_FIELD_MODE_REPEATED)) {
StructField newField =
DataTypes.createStructField(
name, DataTypes.createArrayType(dataplexTypeToSparkType.get(type)), true);
schema.add(newField);
} else {
StructField newField =
DataTypes.createStructField(name, dataplexTypeToSparkType.get(type), true);
schema.add(newField);
}
}
return schema;
}