in src/main/java/com/amazonaws/services/neptune/propertygraph/io/RewriteCsv.java [99:189]
private MasterLabelSchema rewrite(PropertyGraphTargetConfig targetConfig,
GraphElementType graphElementType,
MasterLabelSchema masterLabelSchema) throws Exception {
LabelSchema originalLabelSchema = masterLabelSchema.labelSchema();
LabelSchema masterSchema = originalLabelSchema.createCopy();
masterSchema.initStats();
Collection<String> renamedFiles = new ArrayList<>();
for (FileSpecificLabelSchema fileSpecificLabelSchema : masterLabelSchema.fileSpecificLabelSchemas()) {
LabelSchema labelSchema = fileSpecificLabelSchema.labelSchema();
Label label = labelSchema.label();
File sourceCsvFile = new File(fileSpecificLabelSchema.outputId());
if (!sourceCsvFile.exists()) {
if (label.labels().size() > 1) {
logger.warn("Skipping multi-label file {} because it has already been rewritten under another label", sourceCsvFile);
continue;
}
}
String[] additionalElementHeaders = label.hasFromAndToLabels() ?
new String[]{"~fromLabels", "~toLabels"} :
new String[]{};
String[] filePropertyHeaders =
labelSchema.propertySchemas().stream()
.map(p -> p.property().toString())
.collect(Collectors.toList())
.toArray(new String[]{});
String[] fileHeaders = ArrayUtils.addAll(
graphElementType.tokenNames().toArray(new String[]{}),
ArrayUtils.addAll(additionalElementHeaders, filePropertyHeaders));
try (DeletableFile sourceFile = new DeletableFile(sourceCsvFile);
Reader in = sourceFile.reader();
PropertyGraphPrinter target = graphElementType.writerFactory().createPrinter(
targetConfig.format().replaceExtension(sourceCsvFile.getName(), "modified"),
masterSchema,
targetConfig.forFileConsolidation());
) {
if (featureToggles.containsFeature(FeatureToggle.Keep_Rewritten_Files)){
sourceFile.doNotDelete();
}
renamedFiles.add(target.outputId());
CSVFormat format = CSVFormat.RFC4180.withHeader(fileHeaders);
Iterable<CSVRecord> records = format.parse(in);
int recordCount = 0;
for (CSVRecord record : records) {
target.printStartRow();
if (graphElementType == GraphElementType.nodes) {
target.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
} else if (graphElementType == GraphElementType.edges) {
if (label.hasFromAndToLabels()) {
target.printEdge(
record.get("~id"),
record.get("~label"),
record.get("~from"),
record.get("~to"),
Arrays.asList(record.get("~fromLabels").split(";")),
Arrays.asList(record.get("~toLabels").split(";")));
} else {
target.printEdge(record.get("~id"), record.get("~label"), record.get("~from"), record.get("~to"));
}
}
target.printProperties(record.toMap(), false);
target.printEndRow();
recordCount++;
}
logger.info("Original: {}, Rewritten: {}, RecordCount: {}", sourceFile, target.outputId(), recordCount);
}
}
return new MasterLabelSchema(
masterSchema,
renamedFiles.stream().map(f -> new FileSpecificLabelSchema(f, targetConfig.format(), masterSchema)).collect(Collectors.toList()));
}