in tfx_airflow/notebooks/utils.py [0:0]
def get_source_artifact_of_type(self, artifact_id, source_type_name):
"""Returns the source artifact of `source_type_name` for `artifact_id`.
This method recursively traverses the events and associated executions that
led to generating `artifact_id` to find an artifact of type
`source_type_name` that was an input for these events.
Args:
artifact_id: A `int` indicating the id of an artifact.
source_type_name: A `str` indicating the type of an artifact that is
a direct or indirect input for generating `artifact_id`.
Returns:
A `metadata_store_pb2.Artifact` of type `source_type_name` that is a
direct/indirect input for generating `artifact_id` or `None` if no such
artifact exists.
"""
a_events = self.metadata_store.get_events_by_artifact_ids([artifact_id])
for a_event in a_events:
if _is_input_event(a_event):
continue
[execution] = self.metadata_store.get_executions_by_id(
[a_event.execution_id])
e_events = self.metadata_store.get_events_by_execution_ids([execution.id])
for e_event in e_events:
if _is_output_event(e_event):
continue
[artifact] = self.metadata_store.get_artifacts_by_id(
[e_event.artifact_id])
[artifact_type] = self.metadata_store.get_artifact_types_by_id(
[artifact.type_id])
if artifact_type.name == source_type_name:
return artifact
input_artifact = self.get_source_artifact_of_type(
artifact.id, source_type_name)
if input_artifact:
return input_artifact