in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/example/Neo4j2GraphAr.scala [73:177]
def readAndPutDataIntoWriter(
writer: GraphWriter,
spark: SparkSession
): Unit = {
// read vertices with label "Person" from Neo4j as a DataFrame
// Note: set "schema.flatten.limit" to 1 to not sample null record infer type as string as far as possible,
// if you want a perfect type inference, consider to user APOC.
val person_df = spark.read
.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
.option("schema.flatten.limit", 1)
.load()
// put into writer, vertex type is "Person"
writer.PutVertexData("Person", person_df)
// read vertices with label "Movie" from Neo4j as a DataFrame
val movie_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (n:Movie) RETURN n.title AS title, n.tagline as tagline"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, vertex type is "Movie"
writer.PutVertexData("Movie", movie_df)
// read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
val produced_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:PRODUCED]->(b:Movie) return a.name as src, b.title as dst"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "PRODUCED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)
// read edges with type "Person"->"ACTED_IN"->"Movie" from Neo4j as a DataFrame
val acted_in_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:ACTED_IN]->(b:Movie) return a.name as src, b.title as dst"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "ACTED_IN"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "ACTED_IN", "Movie"), acted_in_edge_df)
// read edges with type "Person"->"DIRECTED"->"Movie" from Neo4j as a DataFrame
val directed_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:DIRECTED]->(b:Movie) return a.name as src, b.title as dst"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "DIRECTED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "DIRECTED", "Movie"), directed_edge_df)
// read edges with type "Person"->"FOLLOWS"->"Person" from Neo4j as a DataFrame
val follows_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:FOLLOWS]->(b:Person) return a.name as src, b.name as dst"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "FOLLOWS"
// target vertex type is "Person"
writer.PutEdgeData(("Person", "FOLLOWS", "Person"), follows_edge_df)
// read edges with type "Person"->"REVIEWED"->"Movie" from Neo4j as a DataFrame
val reviewed_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:REVIEWED]->(b:Movie) return a.name as src, b.title as dst, r.rating as rating, r.summary as summary"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "REVIEWED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "REVIEWED", "Movie"), reviewed_edge_df)
// read edges with type "Person"->"WROTE"->"Movie" from Neo4j as a DataFrame
val wrote_edge_df = spark.read
.format("org.neo4j.spark.DataSource")
.option(
"query",
"MATCH (a:Person)-[r:WROTE]->(b:Movie) return a.name as src, b.title as dst"
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex type is "Person", edge type is "WROTE"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "WROTE", "Movie"), wrote_edge_df)
}