def get_source_artifact_of_type()

in tfx_airflow/notebooks/utils.py [0:0]


  def get_source_artifact_of_type(self, artifact_id, source_type_name):
    """Returns the source artifact of `source_type_name` for `artifact_id`.

    This method recursively traverses the events and associated executions that
    led to generating `artifact_id` to find an artifact of type
    `source_type_name` that was an input for these events.

    Args:
      artifact_id: A `int` indicating the id of an artifact.
      source_type_name: A `str` indicating the type of an artifact that is
          a direct or indirect input for generating `artifact_id`.

    Returns:
      A `metadata_store_pb2.Artifact` of type `source_type_name` that is a
      direct/indirect input for generating `artifact_id` or `None` if no such
      artifact exists.
    """
    a_events = self.metadata_store.get_events_by_artifact_ids([artifact_id])
    for a_event in a_events:
      if _is_input_event(a_event):
        continue
      [execution] = self.metadata_store.get_executions_by_id(
          [a_event.execution_id])
      e_events = self.metadata_store.get_events_by_execution_ids([execution.id])
      for e_event in e_events:
        if _is_output_event(e_event):
          continue
        [artifact] = self.metadata_store.get_artifacts_by_id(
            [e_event.artifact_id])
        [artifact_type] = self.metadata_store.get_artifact_types_by_id(
            [artifact.type_id])
        if artifact_type.name == source_type_name:
          return artifact
        input_artifact = self.get_source_artifact_of_type(
            artifact.id, source_type_name)
        if input_artifact:
          return input_artifact