in src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryJob.java [80:170]
private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionException, InterruptedException {
System.err.println("Writing query results to " + targetConfig.output().name() + " as " + targetConfig.format().description());
Status status = new Status(StatusOutputFormat.Description, "query results");
ExecutorService taskExecutor = Executors.newFixedThreadPool(concurrencyConfig.concurrency());
Collection<Future<Map<GraphElementType, FileSpecificLabelSchemas>>> futures = new ArrayList<>();
Collection<FileSpecificLabelSchemas> nodesFileSpecificLabelSchemas = new ArrayList<>();
Collection<FileSpecificLabelSchemas> edgesFileSpecificLabelSchemas = new ArrayList<>();
Collection<FileSpecificLabelSchemas> queryResultsFileSpecificLabelSchemas = new ArrayList<>();
LabelsFilter nodeLabelFilter = new AllLabels(NodeLabelStrategy.nodeLabelsOnly);
LabelsFilter edgeLabelFilter = new AllLabels(EdgeLabelStrategy.edgeLabelsOnly);
for(ExportSpecification exportSpecification : exportSpecifications) {
if (exportSpecification.getGraphElementType() == GraphElementType.nodes) {
nodeLabelFilter = exportSpecification.getLabelsFilter();
}
else if (exportSpecification.getGraphElementType() == GraphElementType.edges) {
edgeLabelFilter = exportSpecification.getLabelsFilter();
}
}
AtomicInteger fileIndex = new AtomicInteger();
for (int index = 1; index <= concurrencyConfig.concurrency(); index++) {
QueryTask queryTask = new QueryTask(
queries,
queryClient,
targetConfig,
twoPassAnalysis,
timeoutMillis,
status,
fileIndex,
structuredOutput,
nodeLabelFilter,
edgeLabelFilter,
exportSpecifications.iterator().next().getExportStats());
futures.add(taskExecutor.submit(queryTask));
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
for (Future<Map<GraphElementType, FileSpecificLabelSchemas>> future : futures) {
if (future.isCancelled()) {
throw new IllegalStateException("Unable to complete job because at least one task was cancelled");
}
if (!future.isDone()) {
throw new IllegalStateException("Unable to complete job because at least one task has not completed");
}
Map<GraphElementType, FileSpecificLabelSchemas> result = future.get();
nodesFileSpecificLabelSchemas.add(result.get(GraphElementType.nodes));
edgesFileSpecificLabelSchemas.add(result.get(GraphElementType.edges));
queryResultsFileSpecificLabelSchemas.add(result.get(GraphElementType.queryResults));
}
RewriteCommand rewriteCommand = targetConfig.createRewriteCommand(concurrencyConfig, featureToggles);
Map<GraphElementType, GraphElementSchemas> graphElementSchemas = new HashMap<>();
if (structuredOutput) {
for(ExportSpecification exportSpecification : exportSpecifications) {
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
);
try {
graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} else {
MasterLabelSchemas masterLabelSchemas = new MasterLabelSchemas(queryResultsFileSpecificLabelSchemas, GraphElementType.queryResults);
try {
graphElementSchemas.put(GraphElementType.queryResults, rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return graphElementSchemas;
}