def get_execution_for_output_artifact()

in tfx_airflow/notebooks/utils.py [0:0]


  def get_execution_for_output_artifact(self, artifact_id, type_name):
    """Returns the execution of `type_name` that generated `artifact_id`.

    Args:
      artifact_id: A `int` indicating the id of an artifact.
      type_name: A `str` indicating the type of an Execution that generated
        `artifact_id`.

    Returns:
      A `metadata_store_pb2.Execution` of type `type_name` that generated
      `artifact_id` or `None` if no such execution exists.
    """
    a_events = self.metadata_store.get_events_by_artifact_ids([artifact_id])
    for a_event in a_events:
      if _is_input_event(a_event):
        continue
      [execution] = self.metadata_store.get_executions_by_id(
          [a_event.execution_id])
      [execution_type] = self.metadata_store.get_execution_types_by_id(
          [execution.type_id])
      if execution_type.name == type_name:
        return execution