in tfx/orchestration/portable/launcher.py [0:0]
def _prepare_execution(self) -> _ExecutionPreparationResult:
"""Prepares inputs, outputs and execution properties for actual execution."""
with self._mlmd_connection as m:
# 1.Prepares all contexts.
contexts = context_lib.prepare_contexts(
metadata_handle=m, node_contexts=self._pipeline_node.contexts
)
# 2. Resolves inputs and execution properties.
exec_properties = data_types_utils.build_parsed_value_dict(
inputs_utils.resolve_parameters_with_schema(
node_parameters=self._pipeline_node.parameters))
try:
resolved_inputs = inputs_utils.resolve_input_artifacts(
pipeline_node=self._pipeline_node, metadata_handle=m
)
logging.info('[%s] Resolved inputs: %s',
self._pipeline_node.node_info.id, resolved_inputs)
except exceptions.InputResolutionError as e:
logging.exception('[%s] Input resolution error: %s',
self._pipeline_node.node_info.id, e)
execution = self._register_or_reuse_execution(
metadata_handle=m,
contexts=contexts,
exec_properties=exec_properties,
)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=self._build_error_output(code=e.grpc_code_value))
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 3. If not all required inputs are met. Return ExecutionInfo with
# is_execution_needed being false. No publish will happen so down stream
# nodes won't be triggered.
# TODO(b/197907821): Publish special execution for Skip?
if isinstance(resolved_inputs, inputs_utils.Skip):
logging.info('Skipping execution for %s',
self._pipeline_node.node_info.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(),
contexts=contexts,
is_execution_needed=False)
# TODO(b/197741942): Support len > 1.
if len(resolved_inputs) > 1:
executor_output = self._build_error_output(
_ERROR_CODE_UNIMPLEMENTED,
'Handling more than one input dicts not implemented yet.')
execution = self._register_or_reuse_execution(
metadata_handle=m,
contexts=contexts,
exec_properties=exec_properties,
)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=executor_output)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
input_artifacts = resolved_inputs[0]
# 4. Resolve the dynamic exec properties from implicit input channels.
try:
placeholder_context = placeholder_utils.ResolutionContext(
exec_info=data_types.ExecutionInfo(
input_dict={
key: list(value) for key, value in input_artifacts.items()
},
pipeline_node=self._pipeline_node,
pipeline_info=self._pipeline_info,
pipeline_run_id=self._pipeline_runtime_spec.pipeline_run_id.field_value.string_value,
top_level_pipeline_run_id=self._pipeline_runtime_spec.top_level_pipeline_run_id,
),
executor_spec=self._executor_spec,
platform_config=self._platform_config,
)
dynamic_exec_properties = inputs_utils.resolve_dynamic_parameters(
node_parameters=self._pipeline_node.parameters,
context=placeholder_context,
)
exec_properties.update(dynamic_exec_properties)
except exceptions.InputResolutionError as e:
logging.exception('[%s] Dynamic exec property resolution error: %s',
self._pipeline_node.node_info.id, e)
execution = self._register_or_reuse_execution(
metadata_handle=m,
contexts=contexts,
exec_properties=exec_properties,
)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=self._build_error_output(code=e.grpc_code_value))
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 5. Registers execution in metadata.
execution = self._register_or_reuse_execution(
metadata_handle=m,
contexts=contexts,
input_artifacts=input_artifacts,
exec_properties=exec_properties,
)
if execution_lib.is_execution_successful(execution):
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 6. Resolve output
output_artifacts = self._output_resolver.generate_output_artifacts(
execution.id)
# If there is a custom driver, runs it.
if self._driver_operator:
driver_output = self._driver_operator.run_driver(
self._build_execution_info(
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties,
execution_output_uri=(
self._output_resolver.get_driver_output_uri())))
self._update_with_driver_output(driver_output, exec_properties,
output_artifacts)
# We reconnect to MLMD here because the custom driver closes MLMD connection
# on returning.
with self._mlmd_connection as m:
# 7. Check cached result
cache_context = cache_utils.get_cache_context(
metadata_handle=m,
pipeline_node=self._pipeline_node,
pipeline_info=self._pipeline_info,
executor_spec=self._executor_spec,
input_artifacts=input_artifacts,
output_artifacts=output_artifacts,
parameters=exec_properties,
)
contexts.append(cache_context)
# 8. Should cache be used?
if self._pipeline_node.execution_options.caching_options.enable_cache:
cached_outputs = cache_utils.get_cached_outputs(
metadata_handle=m, cache_context=cache_context
)
if cached_outputs is not None:
# Publishes cache result
execution_publish_utils.publish_cached_executions(
metadata_handle=m,
contexts=contexts,
executions=[execution],
output_artifacts_maps=[cached_outputs],
)
logging.info('A cached execution %d is used.', execution.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id,
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties),
execution_metadata=execution,
contexts=contexts,
is_execution_needed=False)
# 9. Going to trigger executor.
logging.info('Going to run a new execution %d', execution.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id,
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties,
execution_output_uri=(
self._output_resolver.get_executor_output_uri(execution.id)),
stateful_working_dir=(
self._output_resolver.get_stateful_working_directory(
execution)),
tmp_dir=self._output_resolver.make_tmp_dir(execution.id)),
execution_metadata=execution,
contexts=contexts,
is_execution_needed=True)