in tfx/orchestration/experimental/core/sync_pipeline_task_gen.py [0:0]
def _generate_tasks_for_node(
self, node: pipeline_pb2.PipelineNode) -> List[task_lib.Task]:
"""Generates list of tasks for the given node."""
node_uid = task_lib.NodeUid.from_pipeline_node(self._pipeline, node)
node_id = node.node_info.id
result = []
node_state = self._node_states_dict[node_uid]
if node_state.state in (pstate.NodeState.STOPPING,
pstate.NodeState.STOPPED):
logging.info('Ignoring node in state \'%s\' for task generation: %s',
node_state.state, node_uid)
return result
# If this is a pure service node, there is no ExecNodeTask to generate
# but we ensure node services and check service status.
service_status = self._ensure_node_services_if_pure(node_id)
if service_status is not None:
if service_status == service_jobs.ServiceStatus.FAILED:
error_msg = f'service job failed; node uid: {node_uid}'
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid,
state=pstate.NodeState.FAILED,
status=status_lib.Status(
code=status_lib.Code.ABORTED, message=error_msg)))
elif service_status == service_jobs.ServiceStatus.SUCCESS:
logging.info('Service node successful: %s', node_uid)
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid, state=pstate.NodeState.COMPLETE))
elif service_status == service_jobs.ServiceStatus.RUNNING:
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid, state=pstate.NodeState.RUNNING))
return result
# If a task for the node is already tracked by the task queue, it need
# not be considered for generation again but we ensure node services
# in case of a mixed service node.
if self._is_task_id_tracked_fn(
task_lib.exec_node_task_id_from_pipeline_node(self._pipeline, node)):
service_status = self._ensure_node_services_if_mixed(node_id)
if service_status == service_jobs.ServiceStatus.FAILED:
error_msg = f'associated service job failed; node uid: {node_uid}'
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid,
state=pstate.NodeState.FAILED,
status=status_lib.Status(
code=status_lib.Code.ABORTED, message=error_msg)))
return result
node_executions = task_gen_utils.get_executions(self._mlmd_handle, node)
latest_execution = task_gen_utils.get_latest_execution(node_executions)
# If the latest execution is successful, we're done.
if latest_execution and execution_lib.is_execution_successful(
latest_execution):
logging.info('Node successful: %s', node_uid)
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid, state=pstate.NodeState.COMPLETE))
return result
# If the latest execution failed or cancelled, the pipeline should be
# aborted if the node is not in state STARTING. For nodes that are
# in state STARTING, a new execution is created.
if (latest_execution and
not execution_lib.is_execution_active(latest_execution) and
node_state.state != pstate.NodeState.STARTING):
error_msg_value = latest_execution.custom_properties.get(
constants.EXECUTION_ERROR_MSG_KEY)
error_msg = data_types_utils.get_metadata_value(
error_msg_value) if error_msg_value else ''
error_msg = f'node failed; node uid: {node_uid}; error: {error_msg}'
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid,
state=pstate.NodeState.FAILED,
status=status_lib.Status(
code=status_lib.Code.ABORTED, message=error_msg)))
return result
exec_node_task = task_gen_utils.generate_task_from_active_execution(
self._mlmd_handle, self._pipeline, node, node_executions)
if exec_node_task:
result.append(
task_lib.UpdateNodeStateTask(
node_uid=node_uid, state=pstate.NodeState.RUNNING))
result.append(exec_node_task)
return result
# Finally, we are ready to generate tasks for the node by resolving inputs.
result.extend(self._resolve_inputs_and_generate_tasks_for_node(node))
return result