in providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py [0:0]
def build_task_kwargs() -> dict:
all_config_keys = AllEcsConfigKeys()
# This will put some kwargs at the root of the dictionary that do NOT belong there. However,
# the code below expects them to be there and will rearrange them as necessary.
task_kwargs = _fetch_config_values()
task_kwargs.update(_fetch_templated_kwargs())
has_launch_type: bool = all_config_keys.LAUNCH_TYPE in task_kwargs
has_capacity_provider: bool = all_config_keys.CAPACITY_PROVIDER_STRATEGY in task_kwargs
is_launch_type_ec2: bool = task_kwargs.get(all_config_keys.LAUNCH_TYPE, None) == ECS_LAUNCH_TYPE_EC2
if has_capacity_provider and has_launch_type:
raise ValueError(
"capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both."
)
if "cluster" in task_kwargs and not (has_capacity_provider or has_launch_type):
# Default API behavior if neither is provided is to fall back on the default capacity
# provider if it exists. Since it is not a required value, check if there is one
# before using it, and if there is not then use the FARGATE launch_type as
# the final fallback.
cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0]
if not cluster.get("defaultCapacityProviderStrategy"):
task_kwargs[all_config_keys.LAUNCH_TYPE] = ECS_LAUNCH_TYPE_FARGATE
# If you're using the EC2 launch type, you should not/can not provide the platform_version. In this
# case we'll drop it on the floor on behalf of the user, instead of throwing an exception.
if is_launch_type_ec2:
task_kwargs.pop(all_config_keys.PLATFORM_VERSION, None)
# There can only be 1 count of these containers
task_kwargs["count"] = 1 # type: ignore
# There could be a generic approach to the below, but likely more convoluted then just manually ensuring
# the one nested config we need to update is present. If we need to override more options in the future we
# should revisit this.
if "overrides" not in task_kwargs:
task_kwargs["overrides"] = {} # type: ignore
if "containerOverrides" not in task_kwargs["overrides"]:
task_kwargs["overrides"]["containerOverrides"] = [{}] # type: ignore
task_kwargs["overrides"]["containerOverrides"][0]["name"] = task_kwargs.pop( # type: ignore
AllEcsConfigKeys.CONTAINER_NAME
)
# The executor will overwrite the 'command' property during execution. Must always be the first container!
task_kwargs["overrides"]["containerOverrides"][0]["command"] = [] # type: ignore
if any(
[
subnets := task_kwargs.pop(AllEcsConfigKeys.SUBNETS, None),
security_groups := task_kwargs.pop(AllEcsConfigKeys.SECURITY_GROUPS, None),
# Surrounding parens are for the walrus operator to function correctly along with the None check
(assign_public_ip := task_kwargs.pop(AllEcsConfigKeys.ASSIGN_PUBLIC_IP, None)) is not None,
]
):
network_config = prune_dict(
{
"awsvpcConfiguration": {
"subnets": str(subnets).split(",") if subnets else None,
"securityGroups": str(security_groups).split(",") if security_groups else None,
"assignPublicIp": parse_assign_public_ip(assign_public_ip, is_launch_type_ec2),
}
}
)
if "subnets" not in network_config["awsvpcConfiguration"]:
raise ValueError("At least one subnet is required to run a task.")
task_kwargs["networkConfiguration"] = network_config
task_kwargs = camelize_dict_keys(task_kwargs)
try:
json.loads(json.dumps(task_kwargs))
except JSONDecodeError:
raise ValueError("AWS ECS Executor config values must be JSON serializable.")
return task_kwargs