distill/analytics/graph/graph.py (141 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import json
import collections
import networkx as nx
import plotly.graph_objects as go
def createDiGraph(nodes, edges, *, drop_recursions: bool = False):
"""
Creates NetworkX Directed Graph Object (G) from defined node, edge list
:param nodes: Series or List of Events, Elements
:param edges: Series or List of Pairs
:param drop_recursions: if True eliminates self:self pairs in edges
:return: A NetworkX graph object
"""
G=nx.DiGraph()
G.add_nodes_from(nodes)
if drop_recursions==True:
edges_filtered = []
for row in edges:
if row[0] != row[1]:
edges_filtered.append(row)
G.add_edges_from(edges_filtered)
return G
else:
G.add_edges_from(edges)
return G
# TODO complete function (args--input edge-list, labels)
def sankey(edges_segmentN, node_labels=False):
"""
Creates Sankey Graph from defined edge list and optional user-provided labels
:param edges_segmentN: List of Tuples
:param node_labels: Optional Dictionary of Values; keys are originals, values are replacements
:return: A Sankey graph
"""
# Remove self-to-self recursions
edge_list_temp = []
for row in edges_segmentN:
if row[0] != row[1]:
edge_list_temp.append(row)
edge_list = edge_list_temp
# Create a counter to count how many elements are in the edge list
edge_list_counter = collections.Counter(edge_list)
# Extract source list, target list, and value list from the tuples
source_list = [i[0] for i in edge_list_counter.keys()]
target_list = [i[1] for i in edge_list_counter.keys()]
value_list = [i for i in edge_list_counter.values()]
# Extract the node names if node_labels does not exist as an argument
nodes = []
for row in edge_list:
for col in row:
if col not in nodes:
nodes.append(col)
# Replace node names with the give node_labels if it is given as an argument
if node_labels:
new_nodes = []
for node in nodes:
if node in node_labels:
new_nodes.append(node_labels[node])
else:
new_nodes.append(node)
# Sources are the nodes sending connections
sources = []
for i in source_list:
sources.append(nodes.index(i))
# Targets are the nodes receiving connections
targets = []
for i in target_list:
targets.append(nodes.index(i))
# Values are the weight of the connections
values = value_list
# If node labels is given as an argument, we replace nodes with node labels
# If not, we use the original node names
if node_labels:
fig = go.Figure(data=[go.Sankey(
node=dict(
label=[new_nodes[item].split("|")[0] for item in range(len(new_nodes))],
),
link=dict(
source=sources,
target=targets,
value=values
))])
else:
fig = go.Figure(data=[go.Sankey(
node=dict(
label=[nodes[item].split("|")[0] for item in range(len(nodes))],
),
link=dict(
source=sources,
target=targets,
value=values
))])
fig.show()
return fig
# TODO complete function (args--input edge-list, labels)
def funnel(edges_segmentN,
user_specification,
node_labels=False):
"""
Creates Funnel Graph from defined edge list and optional user-provided labels
:param edges_segmentN: List of Tuples
:param user_specification: String of Target of interest e.g. #document
:param node_labels: Optional Dictionary of key default values, value replacements
:return: A Funnel graph
"""
## Removing duplicates
edge_list_temp = []
for row in edges_segmentN:
if row[0] != row[1]:
edge_list_temp.append(row)
edge_list = edge_list_temp
## Convert from list of 2s to list of 1s
edgelist_list = []
length = len(edge_list) - 1
for i in edge_list:
if edge_list.index(i) != length:
edgelist_list.append(i[0])
else:
edgelist_list.append(i[0])
edgelist_list.append(i[1])
# Remove the None values
funnel_targets_temp = []
for item in edgelist_list:
if item != None:
funnel_targets_temp.append(item)
funnel_targets = funnel_targets_temp
## Convert that list into a list of 3s
edge_list = []
for i in range(len(funnel_targets)):
if i == (len(funnel_targets) - 2):
break
else:
edge_list.append((funnel_targets[i], funnel_targets[i + 1], funnel_targets[i + 2]))
## Convert the list of 3s to a counter
edge_list_counter = collections.Counter(edge_list)
first_rung = user_specification
new_edge_list = []
for i in edge_list:
if i[0] == user_specification:
new_edge_list.append((i[0], i[1], i[2]))
new_edge_list_counter = collections.Counter(new_edge_list)
new_edge_list_counter.most_common(1)
first_rung = new_edge_list_counter.most_common(1)[0][0][0]
second_rung = new_edge_list_counter.most_common(1)[0][0][1]
third_rung = new_edge_list_counter.most_common(1)[0][0][2]
counter1 = 0
counter2 = 0
counter3 = 0
for i in edge_list:
if i[0] == first_rung:
counter1 += 1
if i[1] == second_rung:
counter2 += 1
if i[2] == third_rung:
counter3 += 1
# Numbers are how many times each target occured
# Edges are the targets
numbers = [counter1, counter2, counter3]
edges = [first_rung, second_rung, third_rung]
# If node labels was given as an argument, replaces the targets with the provided names
if node_labels:
new_edges = []
for edge in edges:
if edge in node_labels:
new_edges.append(node_labels[edge])
else:
new_edges.append(edge)
edges = new_edges
# Plotting labels from the list with the values from the dictionary
data = dict(
number=numbers,
edge=edges)
# Plotting the figure
fig = go.Figure(go.Funnel(
y=edges,
x=numbers,
textposition="inside",
textinfo="value+percent initial",
opacity=0.65, marker={"color": ["deepskyblue", "lightsalmon", "tan"],
"line": {"width": [2]}},
connector={"line": {"color": "lime", "dash": "dot", "width": 5}})
)
fig.show()
return fig