in sapp/pipeline/propagate_shared_texts.py [0:0]
def run(self, input: TraceGraph, summary: Summary) -> Tuple[TraceGraph, Summary]:
if not (self.propagate_sources or self.propagate_features):
return input, summary
graph = input
self.summary = summary
self.graph = graph
if self.propagate_sources:
log.info("Propagating source kinds to sinks")
if self.propagate_features:
log.info("Propagating features to anchor sinks")
for instance in graph.get_issue_instances():
self._propagate_shared_texts(instance)
# Create new assocs based on the visited results
source_count = 0
feature_count = 0
trace_frame_count = 0
for trace_frame_id, sink_to_state in self.visited.items():
trace_frame_count += 1
trace_frame = graph.get_trace_frame_from_id(trace_frame_id)
is_anchor_port = trace_frame.callee_port.startswith("anchor:")
for state in sink_to_state.values():
for (
shared_text,
trace_length,
) in state.shared_text_trace_lengths.items():
shared_text_kind = graph.get_shared_text_by_local_id(
shared_text
).kind
if (
self.propagate_sources
and shared_text_kind == SharedTextKind.SOURCE
):
graph.add_trace_frame_leaf_by_local_id_assoc(
trace_frame, shared_text, trace_length
)
source_count += 1
if (
self.propagate_features
and is_anchor_port
and shared_text_kind == SharedTextKind.FEATURE
):
graph.add_trace_frame_leaf_by_local_id_assoc(
trace_frame, shared_text, depth=None
)
feature_count += 1
log.info(
f"Added {source_count} source kinds and {feature_count} features to {trace_frame_count} trace frames"
)
return graph, summary