in identity-resolution/notebooks/identity-graph/nepytune/usecase/users_from_household.py [0:0]
def _build_networkx_graph(g, identity_group_id):
def get_attributes(attribute_list):
attrs = {}
for attr_name, value in attribute_list:
attr_name = str(attr_name)
if isinstance(value, Iterable) and not isinstance(value, str):
for i, single_val in enumerate(value):
attrs[f"{attr_name}-{i}"] = single_val
else:
if '.' in attr_name:
attr_name = attr_name.split('.')[-1]
attrs[attr_name] = value
return attrs
graph = nx.Graph()
for ig_node in _get_identity_group_hierarchy(g, identity_group_id).toList():
ig_id = ig_node["props"][T.id]
graph.add_node(
ig_id,
**get_attributes(ig_node["props"].items())
)
for persistent_node in ig_node["persistent_ids"]:
p_id = persistent_node["props"][T.id]
graph.add_node(
p_id,
**get_attributes(persistent_node["props"].items())
)
graph.add_edge(ig_id, p_id, label="member")
for transient_node_map in persistent_node["transient_ids"]:
transient_id = transient_node_map[T.id]
graph.add_node(
transient_id,
**get_attributes(transient_node_map.items())
)
graph.add_edge(transient_id, p_id, label="has_identity")
return graph