tfx/dsl/input_resolution/__init__.py (1 lines of code) (raw):
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(b/189917071): Move to docs/guide.
"""Module for input resolution.
When running a node (e.g. Pusher, Resolver, ...) in the TFX pipeline, its input
is first fetched by the input channel definition. This initial input dict
(type: Dict[str, List[Artifact]]) goes through the *input resolution* process
which transforms an initial input dict to a list of input dicts. These
*resolved inputs* are then executed by running Executor.Do() for each input
dict.
Typically `Resolver` node specifies the input resolution logic (other types
of node simply returns `[input_node]` during input resolution). Input resolution
logic of a node can be specified in two ways:
## 1. Using a single `ResolverStrategy`.
This is the classical way of specifying input resolution logic only for
`Resolver` node. Subclass of ResolverStrategy implements `resolve_artifacts()`
method which takes an input dict and produce an optional input dict. If the
result of `resolve_artifacts()` is not None, input resolution result is
a list of that single input dict (`[result]`). Else if the result is None,
input resolution result is an empty list (`[]`).
`Resolver` defines its input resolution logic by specifying `ResolverStrategy`
class on its node creation.
```python
my_resolver = dsl.Resolver(
strategy_class=dsl.experimental.LatestArtifactStrategy,
examples=example_gen.outputs['examples']
).with_id('my_resolver')
```
## 2. Using a `@resolver_function`.
Decorated function (or a *resolver function*) can express complex input
resolution logic beyond a single `ResolverStrategy` by combining multiple
`ResolverOp`s. Each `ResolverOp` represents a single function, and its output
can be fed into inputs of other `ResolverOp`s. Final return value of the
function should be a dict or a list of dict. Signature of `ResolverStrategy` is
always dict -> dict, but signature of `ResolverOp` is more flexible.
For convenience, we allow `ResolverStrategy` to be used in place where
`ResolverOp` can be used. Consider `ResolverStrategy` as a special kind of
`ResolverOp` with dict -> dict signature (though there isn't any inheritance).
The other way is not compatible; `ResolverOp` cannot be used in strategy_class=
argument for `Resolver` node.
```python
@dsl.resolver_function
def my_resolver_fn(root):
# ResolverOp is a building block of input resolution logic.
result = MyCustomResolverOp(root, flag=False)
# You can even use ResolverStrategy as a building block just like ResolverOp.
result = LatestArtifactStrategy(result)
return result
```
Note that invoking `ResolverOp(input_node)` doesn't create a `ResolverOp`
instance, but a dummy object (`OpNode`) for function tracing.
`@resolver_function` decorator converts this *resolver function* to a `Resolver`
node factory, so that calling this function would create a `Resolver` node.
```python
my_resolver = dsl.Resolver(
function=my_resolver_fn,
examples=example_gen.outputs['examples'],
).with_id('my_resolver')
```
"""