def _generate_tasks_for_node()

in tfx/orchestration/experimental/core/sync_pipeline_task_gen.py [0:0]


  def _generate_tasks_for_node(
      self, node: pipeline_pb2.PipelineNode) -> List[task_lib.Task]:
    """Generates list of tasks for the given node."""
    node_uid = task_lib.NodeUid.from_pipeline_node(self._pipeline, node)
    node_id = node.node_info.id
    result = []

    node_state = self._node_states_dict[node_uid]
    if node_state.state in (pstate.NodeState.STOPPING,
                            pstate.NodeState.STOPPED):
      logging.info('Ignoring node in state \'%s\' for task generation: %s',
                   node_state.state, node_uid)
      return result

    # If this is a pure service node, there is no ExecNodeTask to generate
    # but we ensure node services and check service status.
    service_status = self._ensure_node_services_if_pure(node_id)
    if service_status is not None:
      if service_status == service_jobs.ServiceStatus.FAILED:
        error_msg = f'service job failed; node uid: {node_uid}'
        result.append(
            task_lib.UpdateNodeStateTask(
                node_uid=node_uid,
                state=pstate.NodeState.FAILED,
                status=status_lib.Status(
                    code=status_lib.Code.ABORTED, message=error_msg)))
      elif service_status == service_jobs.ServiceStatus.SUCCESS:
        logging.info('Service node successful: %s', node_uid)
        result.append(
            task_lib.UpdateNodeStateTask(
                node_uid=node_uid, state=pstate.NodeState.COMPLETE))
      elif service_status == service_jobs.ServiceStatus.RUNNING:
        result.append(
            task_lib.UpdateNodeStateTask(
                node_uid=node_uid, state=pstate.NodeState.RUNNING))
      return result

    # If a task for the node is already tracked by the task queue, it need
    # not be considered for generation again but we ensure node services
    # in case of a mixed service node.
    if self._is_task_id_tracked_fn(
        task_lib.exec_node_task_id_from_pipeline_node(self._pipeline, node)):
      service_status = self._ensure_node_services_if_mixed(node_id)
      if service_status == service_jobs.ServiceStatus.FAILED:
        error_msg = f'associated service job failed; node uid: {node_uid}'
        result.append(
            task_lib.UpdateNodeStateTask(
                node_uid=node_uid,
                state=pstate.NodeState.FAILED,
                status=status_lib.Status(
                    code=status_lib.Code.ABORTED, message=error_msg)))
      return result

    node_executions = task_gen_utils.get_executions(self._mlmd_handle, node)
    latest_execution = task_gen_utils.get_latest_execution(node_executions)

    # If the latest execution is successful, we're done.
    if latest_execution and execution_lib.is_execution_successful(
        latest_execution):
      logging.info('Node successful: %s', node_uid)
      result.append(
          task_lib.UpdateNodeStateTask(
              node_uid=node_uid, state=pstate.NodeState.COMPLETE))
      return result

    # If the latest execution failed or cancelled, the pipeline should be
    # aborted if the node is not in state STARTING. For nodes that are
    # in state STARTING, a new execution is created.
    if (latest_execution and
        not execution_lib.is_execution_active(latest_execution) and
        node_state.state != pstate.NodeState.STARTING):
      error_msg_value = latest_execution.custom_properties.get(
          constants.EXECUTION_ERROR_MSG_KEY)
      error_msg = data_types_utils.get_metadata_value(
          error_msg_value) if error_msg_value else ''
      error_msg = f'node failed; node uid: {node_uid}; error: {error_msg}'
      result.append(
          task_lib.UpdateNodeStateTask(
              node_uid=node_uid,
              state=pstate.NodeState.FAILED,
              status=status_lib.Status(
                  code=status_lib.Code.ABORTED, message=error_msg)))
      return result

    exec_node_task = task_gen_utils.generate_task_from_active_execution(
        self._mlmd_handle, self._pipeline, node, node_executions)
    if exec_node_task:
      result.append(
          task_lib.UpdateNodeStateTask(
              node_uid=node_uid, state=pstate.NodeState.RUNNING))
      result.append(exec_node_task)
      return result

    # Finally, we are ready to generate tasks for the node by resolving inputs.
    result.extend(self._resolve_inputs_and_generate_tasks_for_node(node))
    return result