in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java [244:526]
public void run() {
try (Neo4jConnection directConnect =
new Neo4jConnection(this.neo4jConnection, this.templateVersion)) {
boolean resetDb = globalSettings.get(Boolean.class, "reset_db").orElse(false);
if (!resetDb) {
directConnect.verifyConnectivity();
} else {
directConnect.resetDatabase();
}
}
////////////////////////////
// If an action transformation has no upstream PCollection, it will use this default context
PCollection<Row> defaultActionContext =
pipeline.apply(
"Default Context",
Create.empty(TypeDescriptor.of(Row.class)).withCoder(ProcessingCoder.of()));
var processingQueue = new BeamBlock(defaultActionContext);
runPreloadActions(findActionsByStage(ActionStage.START).collect(toList()));
Map<ActionStage, List<PCollection<?>>> preActionRows =
findActionsByStages(
Set.of(
ActionStage.PRE_NODES, ActionStage.PRE_RELATIONSHIPS, ActionStage.PRE_QUERIES))
.map(action -> Map.entry(action.getStage(), runAction(action, defaultActionContext)))
.collect(
groupingBy(
Entry::getKey, mapping(Entry::getValue, Collectors.<PCollection<?>>toList())));
var sourceRows = new ArrayList<PCollection<?>>(importSpecification.getSources().size());
var targetRows = new HashMap<TargetType, List<PCollection<?>>>(targetCount());
var allActiveTargets =
importSpecification.getTargets().getAll().stream()
.filter(Target::isActive)
.collect(toList());
var allActiveNodeTargets =
importSpecification.getTargets().getNodes().stream()
.filter(Target::isActive)
.collect(toList());
////////////////////////////
// Process sources
for (var source : importSpecification.getSources()) {
String sourceName = source.getName();
var activeSourceTargets =
allActiveTargets.stream()
.filter(target -> target.getSource().equals(sourceName))
.collect(toList());
if (activeSourceTargets.isEmpty()) {
return;
}
// get provider implementation for source
Provider provider = ProviderFactory.of(source, targetSequence);
provider.configure(optionsParams);
PCollection<Row> sourceMetadata =
pipeline.apply(
String.format("Metadata for source %s", sourceName), provider.queryMetadata());
sourceRows.add(sourceMetadata);
Schema sourceBeamSchema = sourceMetadata.getSchema();
processingQueue.addToQueue(ArtifactType.source, sourceName, defaultActionContext);
////////////////////////////
// Optimization: if some of the current source's targets either
// - do not alter the source query (i.e. define no transformations)
// - or the source provider does not support SQL pushdown
// then the source PCollection can be defined here and reused across all the relevant targets
PCollection<Row> nullableSourceBeamRows = null;
if (!provider.supportsSqlPushDown()
|| activeSourceTargets.stream()
.anyMatch(target -> !ModelUtils.targetHasTransforms(target))) {
nullableSourceBeamRows =
pipeline
.apply("Query " + sourceName, provider.querySourceBeamRows(sourceBeamSchema))
.setRowSchema(sourceBeamSchema);
}
List<NodeTarget> nodeTargets = getTargetsByType(activeSourceTargets, TargetType.NODE);
for (NodeTarget target : nodeTargets) {
TargetQuerySpec targetQuerySpec =
new TargetQuerySpecBuilder()
.sourceBeamSchema(sourceBeamSchema)
.nullableSourceRows(nullableSourceBeamRows)
.target(target)
.build();
String nodeStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ sourceName
+ "->"
+ target.getName()
+ " nodes";
PCollection<Row> preInsertBeamRows =
pipeline.apply(
"Query " + nodeStepDescription, provider.queryTargetBeamRows(targetQuerySpec));
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
dependencies.add(
processingQueue.resolveOutputs(target.getDependencies(), nodeStepDescription));
PCollection<Row> blockingReturn =
preInsertBeamRows
.apply(
"** Unblocking "
+ nodeStepDescription
+ "(after "
+ String.join(", ", target.getDependencies())
+ " and pre-nodes actions)",
Wait.on(dependencies))
.setCoder(preInsertBeamRows.getCoder())
.apply(
"Writing " + nodeStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(preInsertBeamRows.getCoder());
targetRows
.computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(ArtifactType.node, target.getName(), blockingReturn);
}
////////////////////////////
// Write relationship targets
List<RelationshipTarget> relationshipTargets =
getTargetsByType(activeSourceTargets, TargetType.RELATIONSHIP);
for (var target : relationshipTargets) {
var targetQuerySpec =
new TargetQuerySpecBuilder()
.nullableSourceRows(nullableSourceBeamRows)
.sourceBeamSchema(sourceBeamSchema)
.target(target)
.startNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getStartNodeReference()))
.endNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getEndNodeReference()))
.build();
String relationshipStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ sourceName
+ "->"
+ target.getName()
+ " edges";
PCollection<Row> preInsertBeamRows;
if (ModelUtils.targetHasTransforms(target)) {
preInsertBeamRows =
pipeline.apply(
"Query " + relationshipStepDescription,
provider.queryTargetBeamRows(targetQuerySpec));
} else {
preInsertBeamRows = nullableSourceBeamRows;
}
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_RELATIONSHIPS, List.of()));
Set<String> dependencyNames = new LinkedHashSet<>(target.getDependencies());
dependencyNames.add(target.getStartNodeReference());
dependencyNames.add(target.getEndNodeReference());
dependencies.add(
processingQueue.resolveOutputs(dependencyNames, relationshipStepDescription));
PCollection<Row> blockingReturn =
preInsertBeamRows
.apply(
"** Unblocking "
+ relationshipStepDescription
+ "(after "
+ String.join(", ", dependencyNames)
+ " and pre-relationships actions)",
Wait.on(dependencies))
.setCoder(preInsertBeamRows.getCoder())
.apply(
"Writing " + relationshipStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(preInsertBeamRows.getCoder());
targetRows
.computeIfAbsent(
TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
.add(blockingReturn);
// serialize relationships
processingQueue.addToQueue(ArtifactType.edge, target.getName(), blockingReturn);
}
////////////////////////////
// Custom query targets
List<CustomQueryTarget> customQueryTargets =
getTargetsByType(activeSourceTargets, TargetType.QUERY);
for (Target target : customQueryTargets) {
String customQueryStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ sourceName
+ "->"
+ target.getName()
+ " (custom query)";
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
dependencies.add(
processingQueue.resolveOutputs(target.getDependencies(), customQueryStepDescription));
// note: nullableSourceBeamRows is guaranteed to be non-null here since custom query targets
// cannot define source transformations
PCollection<Row> blockingReturn =
nullableSourceBeamRows
.apply(
"** Unblocking "
+ customQueryStepDescription
+ "(after "
+ String.join(", ", target.getDependencies())
+ ")",
Wait.on(dependencies))
.setCoder(nullableSourceBeamRows.getCoder())
.apply(
"Writing " + customQueryStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(nullableSourceBeamRows.getCoder());
targetRows
.computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(ArtifactType.custom_query, target.getName(), blockingReturn);
}
}
// Process POST-* actions, gather outputs and run END actions
List<PCollection<?>> endActionDependencies =
findActionsByStage(ActionStage.POST_SOURCES)
.map(action -> runAction(action, defaultActionContext, sourceRows))
.collect(Collectors.toCollection(ArrayList::new));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_NODES)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.NODE, List.of())))
.collect(toList()));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_RELATIONSHIPS)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.RELATIONSHIP, List.of())))
.collect(toList()));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_QUERIES)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.QUERY, List.of())))
.collect(toList()));
findActionsByStage(ActionStage.END)
.map(action -> runAction(action, defaultActionContext, endActionDependencies))
.forEach(GoogleCloudToNeo4j::noOp);
// For a Dataflow Flex Template, do NOT waitUntilFinish().
pipeline.run();
}