in src/sagemaker/FD_SL_DGL/gnn_fraud_detection_dgl/data.py [0:0]
def parse_edgelist(edges, id_to_node, header=False, source_type='user', sink_type='user'):
"""
Parse an edgelist path file and return the edges as a list of tuple
:param edges: path to comma separated file containing bipartite edges with header for edgetype
:param id_to_node: dictionary containing mapping for node names(id) to dgl node indices
:param header: boolean whether or not the file has a header row
:param source_type: type of the source node in the edge. defaults to 'user' if no header
:param sink_type: type of the sink node in the edge. defaults to 'user' if no header.
:return: (list, dict) a list containing edges of a single relationship type as tuples and updated id_to_node dict.
"""
edge_list = []
rev_edge_list = []
source_pointer, sink_pointer = 0, 0
with open(edges, "r") as fh:
for i, line in enumerate(fh):
source, sink = [col.strip('"') for col in line.strip().split(",")]
if i == 0:
if header:
source_type, sink_type = source, sink
if source_type in id_to_node:
source_pointer = max(id_to_node[source_type].values()) + 1
if sink_type in id_to_node:
sink_pointer = max(id_to_node[sink_type].values()) + 1
continue
source_node, id_to_node, source_pointer = _get_node_idx(id_to_node, source_type, source, source_pointer)
if source_type == sink_type:
sink_node, id_to_node, source_pointer = _get_node_idx(id_to_node, sink_type, sink, source_pointer)
else:
sink_node, id_to_node, sink_pointer = _get_node_idx(id_to_node, sink_type, sink, sink_pointer)
edge_list.append((source_node, sink_node))
rev_edge_list.append((sink_node, source_node))
return edge_list, rev_edge_list, id_to_node, source_type, sink_type