private Map export()

in src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryJob.java [80:170]


    private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionException, InterruptedException {

        System.err.println("Writing query results to " + targetConfig.output().name() + " as " + targetConfig.format().description());

        Status status = new Status(StatusOutputFormat.Description, "query results");

        ExecutorService taskExecutor = Executors.newFixedThreadPool(concurrencyConfig.concurrency());

        Collection<Future<Map<GraphElementType, FileSpecificLabelSchemas>>> futures = new ArrayList<>();

        Collection<FileSpecificLabelSchemas> nodesFileSpecificLabelSchemas = new ArrayList<>();
        Collection<FileSpecificLabelSchemas> edgesFileSpecificLabelSchemas = new ArrayList<>();
        Collection<FileSpecificLabelSchemas> queryResultsFileSpecificLabelSchemas = new ArrayList<>();

        LabelsFilter nodeLabelFilter = new AllLabels(NodeLabelStrategy.nodeLabelsOnly);
        LabelsFilter edgeLabelFilter = new AllLabels(EdgeLabelStrategy.edgeLabelsOnly);

        for(ExportSpecification exportSpecification : exportSpecifications) {
            if (exportSpecification.getGraphElementType() == GraphElementType.nodes) {
                nodeLabelFilter = exportSpecification.getLabelsFilter();
            }
            else if (exportSpecification.getGraphElementType() == GraphElementType.edges) {
                edgeLabelFilter = exportSpecification.getLabelsFilter();
            }
        }

        AtomicInteger fileIndex = new AtomicInteger();

        for (int index = 1; index <= concurrencyConfig.concurrency(); index++) {
            QueryTask queryTask = new QueryTask(
                    queries,
                    queryClient,
                    targetConfig,
                    twoPassAnalysis,
                    timeoutMillis,
                    status,
                    fileIndex,
                    structuredOutput,
                    nodeLabelFilter,
                    edgeLabelFilter,
                    exportSpecifications.iterator().next().getExportStats());
            futures.add(taskExecutor.submit(queryTask));
        }

        taskExecutor.shutdown();

        try {
            taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }

        for (Future<Map<GraphElementType, FileSpecificLabelSchemas>> future : futures) {
            if (future.isCancelled()) {
                throw new IllegalStateException("Unable to complete job because at least one task was cancelled");
            }
            if (!future.isDone()) {
                throw new IllegalStateException("Unable to complete job because at least one task has not completed");
            }
            Map<GraphElementType, FileSpecificLabelSchemas> result = future.get();
            nodesFileSpecificLabelSchemas.add(result.get(GraphElementType.nodes));
            edgesFileSpecificLabelSchemas.add(result.get(GraphElementType.edges));
            queryResultsFileSpecificLabelSchemas.add(result.get(GraphElementType.queryResults));
        }

        RewriteCommand rewriteCommand = targetConfig.createRewriteCommand(concurrencyConfig, featureToggles);
        Map<GraphElementType, GraphElementSchemas> graphElementSchemas = new HashMap<>();

        if (structuredOutput) {
            for(ExportSpecification exportSpecification : exportSpecifications) {
                MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
                        exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
                                nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
                );
                try {
                    graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        } else {
            MasterLabelSchemas masterLabelSchemas = new MasterLabelSchemas(queryResultsFileSpecificLabelSchemas, GraphElementType.queryResults);
            try {
                graphElementSchemas.put(GraphElementType.queryResults, rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return graphElementSchemas;
    }