in dowhy/causal_identifiers/backdoor.py [0:0]
def _path_search_util(self, graph, node1, node2, vis, path, path_dict, is_blocked=False, prev_arrow=None):
'''
:param graph: Adjacency list of the graph under consideration.
:param node1: Current node being considered.
:param node2: Target node.
:param vis: Set of already visited nodes.
:param path: List of nodes comprising the path upto node1.
:path path_dict: Dictionary of node pairs.
:param is_blocked: True is path is blocked by a collider, else False.
:param prev_arrow: Described state of previous arrow. True if arrow incoming, False if arrow outgoing.
'''
if is_blocked:
return
# If node pair has been fully explored
if ((node1, node2) in path_dict) and (path_dict[(node1, node2)].is_complete()):
for i in range(len(path)):
if (path[i], node2) not in path_dict:
path_dict[(path[i], node2)] = NodePair(path[i], node2)
obj = HittingSetAlgorithm(path_dict[(node1, node2)].get_condition_vars(), self._colliders)
# Add node1 to backdoor set of node_pair
s = set([node1])
s = s.union(obj.find_set())
path_dict[(path[i], node2)].update(path[i:], s)
else:
path.append(node1)
vis.add(node1)
if node1 == node2:
# Check if path is backdoor and does not have nodes1\node1 or nodes2\node2 as intermediate nodes
if self.is_backdoor(path) and len(self._nodes12.intersection(set(path[1:-1])))==0:
for i in range(len(path)-1):
if (path[i], node2) not in path_dict:
path_dict[(path[i], node2)] = NodePair(path[i], node2)
path_var = Path()
path_var.update(path[i:].copy(), is_blocked)
path_dict[(path[i], node2)].update(path_var)
else:
for neighbour in graph[node1]:
if neighbour not in vis:
# True if arrow incoming, False if arrow outgoing
next_arrow = False if self._graph.has_edge(node1, neighbour) else True
if next_arrow == True and prev_arrow == True:
is_blocked = True
self._colliders.add(node1)
self._path_search_util(graph, neighbour, node2, vis, path, path_dict, is_blocked, not next_arrow) # Since incoming for current node is outgoing for the next
path.pop()
vis.remove(node1)
# Mark pair (node1, node2) complete
if (node1, node2) in path_dict:
path_dict[(node1, node2)].set_complete()