in src/main/java/com/amazonaws/services/neptune/propertygraph/io/RewriteAndMergeCsv.java [100:191]
private MasterLabelSchema rewriteAndMerge(PropertyGraphTargetConfig targetConfig,
GraphElementType graphElementType,
MasterLabelSchema masterLabelSchema) throws Exception {
LabelSchema masterSchema = masterLabelSchema.labelSchema().createCopy();
masterSchema.initStats();
String targetFilename = Directories.fileName(String.format("%s.consolidated",
masterSchema.label().fullyQualifiedLabel()));
Collection<String> renamedFiles = new ArrayList<>();
try (PropertyGraphPrinter printer = graphElementType.writerFactory().createPrinter(
targetFilename,
masterSchema,
targetConfig.forFileConsolidation())) {
renamedFiles.add(printer.outputId());
for (FileSpecificLabelSchema fileSpecificLabelSchema : masterLabelSchema.fileSpecificLabelSchemas()) {
try (DeletableFile file = new DeletableFile(new File(fileSpecificLabelSchema.outputId()))) {
if (featureToggles.containsFeature(FeatureToggle.Keep_Rewritten_Files)){
file.doNotDelete();
}
LabelSchema labelSchema = fileSpecificLabelSchema.labelSchema();
Label label = labelSchema.label();
String[] additionalElementHeaders = label.hasFromAndToLabels() ?
new String[]{"~fromLabels", "~toLabels"} :
new String[]{};
String[] filePropertyHeaders =
labelSchema.propertySchemas().stream()
.map(p -> p.property().toString())
.collect(Collectors.toList())
.toArray(new String[]{});
String[] fileHeaders = ArrayUtils.addAll(
graphElementType.tokenNames().toArray(new String[]{}),
ArrayUtils.addAll(additionalElementHeaders, filePropertyHeaders));
logger.info("File: {}, Headers: [{}]", fileSpecificLabelSchema.outputId(), fileHeaders);
try (Reader in = file.reader()) {
CSVFormat format = CSVFormat.RFC4180
.withSkipHeaderRecord(false) // files will not have headers
.withHeader(fileHeaders);
Iterable<CSVRecord> records = format.parse(in);
for (CSVRecord record : records) {
printer.printStartRow();
if (graphElementType == GraphElementType.nodes) {
printer.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
} else if (graphElementType == GraphElementType.edges) {
if (label.hasFromAndToLabels()) {
printer.printEdge(
record.get("~id"),
record.get("~label"),
record.get("~from"),
record.get("~to"),
Arrays.asList(record.get("~fromLabels").split(";")),
Arrays.asList(record.get("~toLabels").split(";")));
} else {
printer.printEdge(record.get("~id"), record.get("~label"), record.get("~from"), record.get("~to"));
}
}
printer.printProperties(record.toMap(), false);
printer.printEndRow();
}
} catch (Exception e) {
logger.error("Error while rewriting file: {}", fileSpecificLabelSchema.outputId(), e);
file.doNotDelete();
throw e;
}
}
}
}
return new MasterLabelSchema(
masterSchema,
renamedFiles.stream().map(f -> new FileSpecificLabelSchema(f, targetConfig.format(), masterSchema)).collect(Collectors.toList()));
}