in tfx_airflow/notebooks/utils.py [0:0]
def get_dest_artifact_of_type(self, artifact_id, dest_type_name):
"""Returns the destination artifact of `dest_type_name` from `artifact_id`.
This method recursively traverses the events and associated executions that
consumed `artifact_id` to find an artifact of type `dest_type_name` that was
an output for these events.
Args:
artifact_id: A `int` indicating the id of an artifact.
dest_type_name: A `str` indicating the type of an artifact that is
a output of an event that directly/indirectly consumed `artifact_id`.
Returns:
A `metadata_store_pb2.Artifact` of type `dest_type_name` that is a
direct/indirect output from `artifact_id` or `None` if no such artifact
exists.
"""
a_events = self.metadata_store.get_events_by_artifact_ids([artifact_id])
for a_event in a_events:
if _is_output_event(a_event):
continue
[execution] = self.metadata_store.get_executions_by_id(
[a_event.execution_id])
e_events = self.metadata_store.get_events_by_execution_ids(
[execution.id])
for e_event in e_events:
if _is_input_event(e_event):
continue
[artifact] = self.metadata_store.get_artifacts_by_id(
[e_event.artifact_id])
[artifact_type] = self.metadata_store.get_artifact_types_by_id(
[artifact.type_id])
if artifact_type.name == dest_type_name:
return artifact
dest_artifact = self.get_dest_artifact_of_type(
artifact.id, dest_type_name)
if dest_artifact:
return dest_artifact