in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/graph/GraphTransformer.scala [102:185]
private def transformAllEdges(
sourceGraphInfo: GraphInfo,
destGraphInfo: GraphInfo,
sourceVertexInfosMap: Map[String, VertexInfo],
sourceEdgeInfosMap: Map[String, EdgeInfo],
spark: SparkSession
): Unit = {
val source_prefix = sourceGraphInfo.getPrefix
val dest_prefix = destGraphInfo.getPrefix
// traverse edge infos of the destination graph
val dest_edges_it = destGraphInfo.getEdges.iterator
while (dest_edges_it.hasNext()) {
// load dest edge info
val path = dest_prefix + dest_edges_it.next()
val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark)
// load source edge info
val key = dest_edge_info.getConcatKey()
if (!sourceEdgeInfosMap.contains(key)) {
throw new IllegalArgumentException(
"edge info of " + key + " not found in graph info."
)
}
val source_edge_info = sourceEdgeInfosMap(key)
var has_loaded = false
var df = spark.emptyDataFrame
// traverse all adjList types
val dest_adj_lists = dest_edge_info.getAdj_lists
val adj_list_it = dest_adj_lists.iterator
while (adj_list_it.hasNext()) {
val dest_adj_list_type = adj_list_it.next().getAdjList_type_in_gar
// load edge DataFrame from the source graph
if (!has_loaded) {
val source_adj_lists = source_edge_info.getAdj_lists
var source_adj_list_type = dest_adj_list_type
if (!source_edge_info.containAdjList(dest_adj_list_type))
if (source_adj_lists.size() > 0)
source_adj_list_type =
source_adj_lists.get(0).getAdjList_type_in_gar
// read edge chunks from source graph
val reader = new EdgeReader(
source_prefix,
source_edge_info,
source_adj_list_type,
spark
)
df = reader.readEdges(false)
has_loaded = true
df.persist(GeneralParams.defaultStorageLevel)
}
// read vertices number
val vertex_type = {
if (
dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source
)
dest_edge_info.getSrc_type
else
dest_edge_info.getDst_type
}
if (!sourceVertexInfosMap.contains(vertex_type)) {
throw new IllegalArgumentException(
"vertex info of " + vertex_type + " not found in graph info."
)
}
val vertex_info = sourceVertexInfosMap(vertex_type)
val reader = new VertexReader(source_prefix, vertex_info, spark)
val vertex_num = reader.readVerticesNumber()
// write edge chunks for dest graph
val writer = new EdgeWriter(
dest_prefix,
dest_edge_info,
dest_adj_list_type,
vertex_num,
df
)
writer.writeEdges()
df.unpersist()
}
}
}