private def transformAllEdges()

in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/graph/GraphTransformer.scala [102:185]


  private def transformAllEdges(
      sourceGraphInfo: GraphInfo,
      destGraphInfo: GraphInfo,
      sourceVertexInfosMap: Map[String, VertexInfo],
      sourceEdgeInfosMap: Map[String, EdgeInfo],
      spark: SparkSession
  ): Unit = {
    val source_prefix = sourceGraphInfo.getPrefix
    val dest_prefix = destGraphInfo.getPrefix

    // traverse edge infos of the destination graph
    val dest_edges_it = destGraphInfo.getEdges.iterator
    while (dest_edges_it.hasNext()) {
      // load dest edge info
      val path = dest_prefix + dest_edges_it.next()
      val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark)
      // load source edge info
      val key = dest_edge_info.getConcatKey()
      if (!sourceEdgeInfosMap.contains(key)) {
        throw new IllegalArgumentException(
          "edge info of " + key + " not found in graph info."
        )
      }
      val source_edge_info = sourceEdgeInfosMap(key)
      var has_loaded = false
      var df = spark.emptyDataFrame

      // traverse all adjList types
      val dest_adj_lists = dest_edge_info.getAdj_lists
      val adj_list_it = dest_adj_lists.iterator
      while (adj_list_it.hasNext()) {
        val dest_adj_list_type = adj_list_it.next().getAdjList_type_in_gar

        // load edge DataFrame from the source graph
        if (!has_loaded) {
          val source_adj_lists = source_edge_info.getAdj_lists
          var source_adj_list_type = dest_adj_list_type
          if (!source_edge_info.containAdjList(dest_adj_list_type))
            if (source_adj_lists.size() > 0)
              source_adj_list_type =
                source_adj_lists.get(0).getAdjList_type_in_gar
          // read edge chunks from source graph
          val reader = new EdgeReader(
            source_prefix,
            source_edge_info,
            source_adj_list_type,
            spark
          )
          df = reader.readEdges(false)
          has_loaded = true
          df.persist(GeneralParams.defaultStorageLevel)
        }

        // read vertices number
        val vertex_type = {
          if (
            dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source
          )
            dest_edge_info.getSrc_type
          else
            dest_edge_info.getDst_type
        }
        if (!sourceVertexInfosMap.contains(vertex_type)) {
          throw new IllegalArgumentException(
            "vertex info of " + vertex_type + " not found in graph info."
          )
        }
        val vertex_info = sourceVertexInfosMap(vertex_type)
        val reader = new VertexReader(source_prefix, vertex_info, spark)
        val vertex_num = reader.readVerticesNumber()

        // write edge chunks for dest graph
        val writer = new EdgeWriter(
          dest_prefix,
          dest_edge_info,
          dest_adj_list_type,
          vertex_num,
          df
        )
        writer.writeEdges()
        df.unpersist()
      }
    }
  }