def readAndPutDataIntoWriter()

in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/example/Neo4j2GraphAr.scala [73:177]


  def readAndPutDataIntoWriter(
      writer: GraphWriter,
      spark: SparkSession
  ): Unit = {
    // read vertices with label "Person" from Neo4j as a DataFrame
    // Note: set "schema.flatten.limit" to 1 to not sample null record infer type as string as far as possible,
    // if you want a perfect type inference, consider to user APOC.
    val person_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, vertex type is "Person"
    writer.PutVertexData("Person", person_df)

    // read vertices with label "Movie" from Neo4j as a DataFrame
    val movie_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (n:Movie) RETURN n.title AS title, n.tagline as tagline"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, vertex type is "Movie"
    writer.PutVertexData("Movie", movie_df)

    // read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
    val produced_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:PRODUCED]->(b:Movie) return a.name as src, b.title as dst"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "PRODUCED"
    // target vertex type is "Movie"
    writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)

    // read edges with type "Person"->"ACTED_IN"->"Movie" from Neo4j as a DataFrame
    val acted_in_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:ACTED_IN]->(b:Movie) return a.name as src, b.title as dst"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "ACTED_IN"
    // target vertex type is "Movie"
    writer.PutEdgeData(("Person", "ACTED_IN", "Movie"), acted_in_edge_df)

    // read edges with type "Person"->"DIRECTED"->"Movie" from Neo4j as a DataFrame
    val directed_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:DIRECTED]->(b:Movie) return a.name as src, b.title as dst"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "DIRECTED"
    // target vertex type is "Movie"
    writer.PutEdgeData(("Person", "DIRECTED", "Movie"), directed_edge_df)

    // read edges with type "Person"->"FOLLOWS"->"Person" from Neo4j as a DataFrame
    val follows_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:FOLLOWS]->(b:Person) return a.name as src, b.name as dst"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "FOLLOWS"
    // target vertex type is "Person"
    writer.PutEdgeData(("Person", "FOLLOWS", "Person"), follows_edge_df)

    // read edges with type "Person"->"REVIEWED"->"Movie" from Neo4j as a DataFrame
    val reviewed_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:REVIEWED]->(b:Movie) return a.name as src, b.title as dst, r.rating as rating, r.summary as summary"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "REVIEWED"
    // target vertex type is "Movie"
    writer.PutEdgeData(("Person", "REVIEWED", "Movie"), reviewed_edge_df)

    // read edges with type "Person"->"WROTE"->"Movie" from Neo4j as a DataFrame
    val wrote_edge_df = spark.read
      .format("org.neo4j.spark.DataSource")
      .option(
        "query",
        "MATCH (a:Person)-[r:WROTE]->(b:Movie) return a.name as src, b.title as dst"
      )
      .option("schema.flatten.limit", 1)
      .load()
    // put into writer, source vertex type is "Person", edge type is "WROTE"
    // target vertex type is "Movie"
    writer.PutEdgeData(("Person", "WROTE", "Movie"), wrote_edge_df)
  }