playground/process_analysis/status_transition_graph.py (78 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from datetime import timedelta
import networkx as nx
import pandas as pd
from sqlalchemy.engine import Engine
from playground.process_analysis.issue_filter import IssueFilter
# pylint: disable=too-many-instance-attributes
@dataclass(frozen=True)
class StatusChange:
issue_key: str
issue_type: str
created_date: pd.Timestamp
original_from_value: str
from_value: str
original_to_value: str
to_value: str
changed_date: pd.Timestamp
class StatusTransitionGraph:
"""
A directed graph that represents the transitions between statuses of issues from a DevLake data source.
"""
def __init__(self):
self.graph = nx.DiGraph()
self.total_transition_count = 0
def add_status_change(self, status_change: StatusChange, previous_status_change: StatusChange | None):
self.__update_nodes(status_change, previous_status_change)
self.__update_edges(status_change, previous_status_change)
self.total_transition_count += 1
def __update_nodes(self, status_change: StatusChange, previous_status_change: StatusChange | None):
from_status = status_change.original_from_value
if not self.graph.has_node(from_status):
self.graph.add_node(from_status, count=0, category=status_change.from_value)
to_status = status_change.original_to_value
if not self.graph.has_node(to_status):
self.graph.add_node(to_status, count=0, category=status_change.to_value)
if not _for_same_issue(status_change, previous_status_change):
self.graph.nodes[from_status]["count"] += 1
self.graph.nodes[to_status]["count"] += 1
def __update_edges(self, status_change: StatusChange, previous_status_change: StatusChange | None):
duration = _days_between(status_change, previous_status_change)
edge_from = status_change.original_from_value
edge_to = status_change.original_to_value
if self.graph.has_edge(edge_from, edge_to):
self.graph.edges[edge_from, edge_to]["durations"].append(duration)
else:
self.graph.add_edge(edge_from, edge_to, durations=[duration])
@classmethod
def from_database(cls, db_engine: Engine, issue_filter: IssueFilter | None = None) -> "StatusTransitionGraph":
"""
Create a StatusTransitionGraph using a connection to a DevLake database.
"""
query = "select i.issue_key as issue_key, i.original_type as issue_type, i.created_date as created_date, \
ic.original_from_value as original_from_value, ic.from_value as from_value, \
ic.original_to_value as original_to_value, ic.to_value as to_value, \
ic.created_date as changed_date \
from issue_changelogs ic \
join issues i on i.id = ic.issue_id \
where ic.field_name = 'status';"
df = pd.read_sql_query(query, db_engine)
return cls.from_data_frame(df, issue_filter)
@classmethod
def from_data_frame(cls, df: pd.DataFrame, issue_filter: IssueFilter | None = None) -> "StatusTransitionGraph":
"""
Create a StatusTransitionGraph from a Pandas DataFrame.
For advanced usage, and testing. For most use cases, use the from_database method.
Note: The DataFrame must have a column for each field in the StatusChange class.
"""
if df.empty:
raise ValueError("Provided DataFrame is empty" +
"no status transitions in the 'issue_changelogs' table were found.")
process_graph: StatusTransitionGraph = cls()
df = df.copy().sort_values(by=["issue_key", "changed_date"], ascending=True)
if issue_filter is not None:
df = issue_filter.apply(df)
previous_status_change: StatusChange = None
for item in df.itertuples(index=False):
status_change = StatusChange(*item)
process_graph.add_status_change(status_change, previous_status_change)
previous_status_change = status_change
return process_graph
def _for_same_issue(status_change: StatusChange, previous_status_change: StatusChange | None) -> bool:
if previous_status_change is None:
return False
return status_change.issue_key == previous_status_change.issue_key
def _days_between(status_change: StatusChange, previous_status_change: StatusChange | None) -> float:
if _for_same_issue(status_change, previous_status_change):
return _timedelta_between(status_change.changed_date, previous_status_change.changed_date) / timedelta(days=1)
return _timedelta_between(status_change.changed_date, status_change.created_date) / timedelta(days=1)
def _timedelta_between(current: pd.Timestamp, previous: pd.Timestamp) -> timedelta:
return current.to_pydatetime() - previous.to_pydatetime()