protected Dataset flattendf()

in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [222:287]


    protected Dataset flattendf(Dataset ds) {


        StructField[] fields = ds.schema().fields();

        List<String> fieldsNames = new ArrayList<>();
        for (StructField s : fields) {
            fieldsNames.add(s.name());
        }

        for (int i = 0; i < fields.length; i++) {

            StructField field = fields[i];
            DataType fieldType = field.dataType();
            String fieldName = field.name();

            if (fieldType instanceof ArrayType) {
                List<String> fieldNamesExcludingArray = new ArrayList<String>();
                for (String fieldName_index : fieldsNames) {
                    if (!fieldName.equals(fieldName_index))
                        fieldNamesExcludingArray.add(fieldName_index);
                }

                List<String> fieldNamesAndExplode = new ArrayList<>(fieldNamesExcludingArray);
                String s = String.format("explode_outer(%s) as %s", fieldName, fieldName);
                fieldNamesAndExplode.add(s);

                String[] exFieldsWithArray = new String[fieldNamesAndExplode.size()];
                Dataset exploded_ds = ds.toDF().selectExpr(fieldNamesAndExplode.toArray(exFieldsWithArray));

                return flattendf(exploded_ds);

            } else if (fieldType instanceof StructType) {

                String[] childFieldnames_struct = ((StructType) fieldType).fieldNames();

                List<String> childFieldnames = new ArrayList<>();
                for (String childName : childFieldnames_struct) {
                    childFieldnames.add(fieldName + "." + childName);
                }

                List<String> newfieldNames = new ArrayList<>();
                for (String fieldName_index : fieldsNames) {
                    if (!fieldName.equals(fieldName_index))
                        newfieldNames.add(fieldName_index);
                }

                newfieldNames.addAll(childFieldnames);

                List<Column> renamedStrutctCols = new ArrayList<>();

                for (String newFieldNames_index : newfieldNames) {
                    renamedStrutctCols.add(new Column(newFieldNames_index).as(newFieldNames_index.replace(".", "_")));
                }

                Seq renamedStructCols_seq = JavaConverters.collectionAsScalaIterableConverter(renamedStrutctCols).asScala().toSeq();

                Dataset ds_struct = ds.toDF().select(renamedStructCols_seq);
                return flattendf(ds_struct);
            } else {

            }

        }
        return ds;
    }