private MasterLabelSchema rewriteAndMerge()

in src/main/java/com/amazonaws/services/neptune/propertygraph/io/RewriteAndMergeCsv.java [100:191]


    private MasterLabelSchema rewriteAndMerge(PropertyGraphTargetConfig targetConfig,
                                              GraphElementType graphElementType,
                                              MasterLabelSchema masterLabelSchema) throws Exception {

        LabelSchema masterSchema = masterLabelSchema.labelSchema().createCopy();
        masterSchema.initStats();

        String targetFilename = Directories.fileName(String.format("%s.consolidated",
                masterSchema.label().fullyQualifiedLabel()));

        Collection<String> renamedFiles = new ArrayList<>();

        try (PropertyGraphPrinter printer = graphElementType.writerFactory().createPrinter(
                targetFilename,
                masterSchema,
                targetConfig.forFileConsolidation())) {

            renamedFiles.add(printer.outputId());

            for (FileSpecificLabelSchema fileSpecificLabelSchema : masterLabelSchema.fileSpecificLabelSchemas()) {

                try (DeletableFile file = new DeletableFile(new File(fileSpecificLabelSchema.outputId()))) {

                    if (featureToggles.containsFeature(FeatureToggle.Keep_Rewritten_Files)){
                        file.doNotDelete();
                    }

                    LabelSchema labelSchema = fileSpecificLabelSchema.labelSchema();
                    Label label = labelSchema.label();

                    String[] additionalElementHeaders = label.hasFromAndToLabels() ?
                            new String[]{"~fromLabels", "~toLabels"} :
                            new String[]{};

                    String[] filePropertyHeaders =
                            labelSchema.propertySchemas().stream()
                                    .map(p -> p.property().toString())
                                    .collect(Collectors.toList())
                                    .toArray(new String[]{});

                    String[] fileHeaders = ArrayUtils.addAll(
                            graphElementType.tokenNames().toArray(new String[]{}),
                            ArrayUtils.addAll(additionalElementHeaders, filePropertyHeaders));

                    logger.info("File: {}, Headers: [{}]", fileSpecificLabelSchema.outputId(), fileHeaders);

                    try (Reader in = file.reader()) {

                        CSVFormat format = CSVFormat.RFC4180
                                .withSkipHeaderRecord(false) // files will not have headers
                                .withHeader(fileHeaders);

                        Iterable<CSVRecord> records = format.parse(in);

                        for (CSVRecord record : records) {

                            printer.printStartRow();

                            if (graphElementType == GraphElementType.nodes) {
                                printer.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
                            } else if (graphElementType == GraphElementType.edges) {
                                if (label.hasFromAndToLabels()) {
                                    printer.printEdge(
                                            record.get("~id"),
                                            record.get("~label"),
                                            record.get("~from"),
                                            record.get("~to"),
                                            Arrays.asList(record.get("~fromLabels").split(";")),
                                            Arrays.asList(record.get("~toLabels").split(";")));
                                } else {
                                    printer.printEdge(record.get("~id"), record.get("~label"), record.get("~from"), record.get("~to"));
                                }
                            }

                            printer.printProperties(record.toMap(), false);
                            printer.printEndRow();
                        }

                    } catch (Exception e) {
                        logger.error("Error while rewriting file: {}", fileSpecificLabelSchema.outputId(), e);
                        file.doNotDelete();
                        throw e;
                    }
                }
            }

        }

        return new MasterLabelSchema(
                masterSchema,
                renamedFiles.stream().map(f -> new FileSpecificLabelSchema(f, targetConfig.format(), masterSchema)).collect(Collectors.toList()));
    }