in src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryTask.java [86:138]
public Map<GraphElementType, FileSpecificLabelSchemas> call() throws Exception {
QueriesWriterFactory writerFactory = new QueriesWriterFactory();
Map<Label, LabelWriter<Map<?, ?>>> labelWriters = new HashMap<>();
Map<GraphElementType, FileSpecificLabelSchemas> fileSpecificLabelSchemasMap = new HashMap<>();
fileSpecificLabelSchemasMap.put(GraphElementType.nodes, new FileSpecificLabelSchemas());
fileSpecificLabelSchemasMap.put(GraphElementType.edges, new FileSpecificLabelSchemas());
fileSpecificLabelSchemasMap.put(GraphElementType.queryResults, new FileSpecificLabelSchemas());
try {
while (status.allowContinue()) {
try {
NamedQuery namedQuery = queries.poll();
if (!(namedQuery == null)) {
final GraphElementSchemas graphElementSchemas = new GraphElementSchemas();
if (twoPassAnalysis) {
Timer.timedActivity(String.format("generating schema for query [%s]", namedQuery.query()),
(Activity.Runnable) () -> updateSchema(namedQuery, graphElementSchemas));
}
Timer.timedActivity(String.format("executing query [%s]", namedQuery.query()),
(CheckedActivity.Runnable) () ->
executeQuery(namedQuery, writerFactory, labelWriters, graphElementSchemas, fileSpecificLabelSchemasMap));
} else {
status.halt();
}
} catch (IllegalStateException e) {
logger.warn("Unexpected result value. {}. Proceeding with next query.", e.getMessage());
}
}
} finally {
for (LabelWriter<Map<?, ?>> labelWriter : labelWriters.values()) {
try {
labelWriter.close();
} catch (Exception e) {
logger.warn("Error closing label writer: {}.", e.getMessage());
}
}
}
return fileSpecificLabelSchemasMap;
}