in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [222:287]
protected Dataset flattendf(Dataset ds) {
StructField[] fields = ds.schema().fields();
List<String> fieldsNames = new ArrayList<>();
for (StructField s : fields) {
fieldsNames.add(s.name());
}
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
DataType fieldType = field.dataType();
String fieldName = field.name();
if (fieldType instanceof ArrayType) {
List<String> fieldNamesExcludingArray = new ArrayList<String>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
fieldNamesExcludingArray.add(fieldName_index);
}
List<String> fieldNamesAndExplode = new ArrayList<>(fieldNamesExcludingArray);
String s = String.format("explode_outer(%s) as %s", fieldName, fieldName);
fieldNamesAndExplode.add(s);
String[] exFieldsWithArray = new String[fieldNamesAndExplode.size()];
Dataset exploded_ds = ds.toDF().selectExpr(fieldNamesAndExplode.toArray(exFieldsWithArray));
return flattendf(exploded_ds);
} else if (fieldType instanceof StructType) {
String[] childFieldnames_struct = ((StructType) fieldType).fieldNames();
List<String> childFieldnames = new ArrayList<>();
for (String childName : childFieldnames_struct) {
childFieldnames.add(fieldName + "." + childName);
}
List<String> newfieldNames = new ArrayList<>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
newfieldNames.add(fieldName_index);
}
newfieldNames.addAll(childFieldnames);
List<Column> renamedStrutctCols = new ArrayList<>();
for (String newFieldNames_index : newfieldNames) {
renamedStrutctCols.add(new Column(newFieldNames_index).as(newFieldNames_index.replace(".", "_")));
}
Seq renamedStructCols_seq = JavaConverters.collectionAsScalaIterableConverter(renamedStrutctCols).asScala().toSeq();
Dataset ds_struct = ds.toDF().select(renamedStructCols_seq);
return flattendf(ds_struct);
} else {
}
}
return ds;
}