in tfx_airflow/notebooks/utils.py [0:0]
def get_execution_for_output_artifact(self, artifact_id, type_name):
"""Returns the execution of `type_name` that generated `artifact_id`.
Args:
artifact_id: A `int` indicating the id of an artifact.
type_name: A `str` indicating the type of an Execution that generated
`artifact_id`.
Returns:
A `metadata_store_pb2.Execution` of type `type_name` that generated
`artifact_id` or `None` if no such execution exists.
"""
a_events = self.metadata_store.get_events_by_artifact_ids([artifact_id])
for a_event in a_events:
if _is_input_event(a_event):
continue
[execution] = self.metadata_store.get_executions_by_id(
[a_event.execution_id])
[execution_type] = self.metadata_store.get_execution_types_by_id(
[execution.type_id])
if execution_type.name == type_name:
return execution