in src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java [87:148]
private MasterLabelSchemas export(ExportSpecification exportSpecification) throws Exception {
Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemas = new ArrayList<>();
AtomicInteger fileDescriptorCount = new AtomicInteger();
for (ExportSpecification labelSpecificExportSpecification : exportSpecification.splitByLabel()) {
Collection<Future<FileSpecificLabelSchemas>> futures = new ArrayList<>();
RangeFactory rangeFactory = labelSpecificExportSpecification.createRangeFactory(g, rangeConfig, concurrencyConfig);
Status status = new Status(
StatusOutputFormat.Description,
String.format("%s: %s total", labelSpecificExportSpecification.description(), rangeFactory.numberOfItemsToExport()),
() -> String.format(" [%s GB free space]", targetConfig.freeSpaceInGigabytes()));
String description = String.format("writing %s as %s to %s",
labelSpecificExportSpecification.description(),
targetConfig.format().description(),
targetConfig.output().name());
System.err.println("Started " + description);
AtomicInteger fileIndex = new AtomicInteger();
Timer.timedActivity(description, (CheckedActivity.Runnable) () -> {
ExecutorService taskExecutor = Executors.newFixedThreadPool(rangeFactory.concurrency());
for (int index = 1; index <= rangeFactory.concurrency(); index++) {
ExportPropertyGraphTask exportTask = labelSpecificExportSpecification.createExportTask(
graphSchema,
g,
targetConfig,
gremlinFilters,
rangeFactory,
status,
fileIndex,
fileDescriptorCount,
maxFileDescriptorCount
);
futures.add(taskExecutor.submit(exportTask));
}
taskExecutor.shutdown();
try {
if (!taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
logger.warn("Timeout expired with uncompleted tasks");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
updateFileSpecificLabelSchemas(futures, fileSpecificLabelSchemas);
});
}
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(fileSpecificLabelSchemas);
RewriteCommand rewriteCommand = targetConfig.createRewriteCommand(concurrencyConfig, featureToggles);
return rewriteCommand.execute(masterLabelSchemas);
}