in src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryTask.java [150:208]
private void executeQuery(NamedQuery namedQuery,
QueriesWriterFactory writerFactory,
Map<Label, LabelWriter<Map<?, ?>>> labelWriters,
GraphElementSchemas graphElementSchemas,
Map<GraphElementType, FileSpecificLabelSchemas> fileSpecificLabelSchemasMap) {
ResultSet results = queryClient.submit(namedQuery.query(), timeoutMillis);
GraphElementHandler<Map<?, ?>> handler;
if(structuredOutput) {
handler = new QueriesResultWrapperHandler(
new CountingHandler<QueriesNodeResult>(
new ExportPGTaskHandler<QueriesNodeResult>(
fileSpecificLabelSchemasMap.get(GraphElementType.nodes),
graphElementSchemas,
targetConfig,
(WriterFactory<QueriesNodeResult>) GraphElementType.nodes.writerFactory(),
new LabelWriters<>(new AtomicInteger(), 0),
new ExportStatsWrapper(exportStats, GraphElementType.nodes),
status,
index,
nodeLabelFilter)
),
new CountingHandler<QueriesEdgeResult>(
new ExportPGTaskHandler<QueriesEdgeResult>(
fileSpecificLabelSchemasMap.get(GraphElementType.edges),
graphElementSchemas,
targetConfig,
(WriterFactory<QueriesEdgeResult>) GraphElementType.edges.writerFactory(),
new LabelWriters<>(new AtomicInteger(), 0),
new ExportStatsWrapper(exportStats, GraphElementType.edges),
status,
index,
edgeLabelFilter)
)
);
}
else {
ResultsHandler resultsHandler = new ResultsHandler(
fileSpecificLabelSchemasMap.get(GraphElementType.queryResults),
new Label(namedQuery.name()),
labelWriters,
writerFactory,
graphElementSchemas);
handler = new StatusHandler(resultsHandler, status);
}
results.stream().
map(r -> castToMap(r.getObject())).
forEach(r -> {
try {
handler.handle(r, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}