def build_task_kwargs()

in providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py [0:0]


def build_task_kwargs() -> dict:
    all_config_keys = AllEcsConfigKeys()
    # This will put some kwargs at the root of the dictionary that do NOT belong there. However,
    # the code below expects them to be there and will rearrange them as necessary.
    task_kwargs = _fetch_config_values()
    task_kwargs.update(_fetch_templated_kwargs())

    has_launch_type: bool = all_config_keys.LAUNCH_TYPE in task_kwargs
    has_capacity_provider: bool = all_config_keys.CAPACITY_PROVIDER_STRATEGY in task_kwargs
    is_launch_type_ec2: bool = task_kwargs.get(all_config_keys.LAUNCH_TYPE, None) == ECS_LAUNCH_TYPE_EC2

    if has_capacity_provider and has_launch_type:
        raise ValueError(
            "capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both."
        )
    if "cluster" in task_kwargs and not (has_capacity_provider or has_launch_type):
        # Default API behavior if neither is provided is to fall back on the default capacity
        # provider if it exists. Since it is not a required value, check if there is one
        # before using it, and if there is not then use the FARGATE launch_type as
        # the final fallback.
        cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0]
        if not cluster.get("defaultCapacityProviderStrategy"):
            task_kwargs[all_config_keys.LAUNCH_TYPE] = ECS_LAUNCH_TYPE_FARGATE

    # If you're using the EC2 launch type, you should not/can not provide the platform_version. In this
    # case we'll drop it on the floor on behalf of the user, instead of throwing an exception.
    if is_launch_type_ec2:
        task_kwargs.pop(all_config_keys.PLATFORM_VERSION, None)

    # There can only be 1 count of these containers
    task_kwargs["count"] = 1  # type: ignore
    # There could be a generic approach to the below, but likely more convoluted then just manually ensuring
    # the one nested config we need to update is present. If we need to override more options in the future we
    # should revisit this.
    if "overrides" not in task_kwargs:
        task_kwargs["overrides"] = {}  # type: ignore
    if "containerOverrides" not in task_kwargs["overrides"]:
        task_kwargs["overrides"]["containerOverrides"] = [{}]  # type: ignore
    task_kwargs["overrides"]["containerOverrides"][0]["name"] = task_kwargs.pop(  # type: ignore
        AllEcsConfigKeys.CONTAINER_NAME
    )
    # The executor will overwrite the 'command' property during execution. Must always be the first container!
    task_kwargs["overrides"]["containerOverrides"][0]["command"] = []  # type: ignore

    if any(
        [
            subnets := task_kwargs.pop(AllEcsConfigKeys.SUBNETS, None),
            security_groups := task_kwargs.pop(AllEcsConfigKeys.SECURITY_GROUPS, None),
            # Surrounding parens are for the walrus operator to function correctly along with the None check
            (assign_public_ip := task_kwargs.pop(AllEcsConfigKeys.ASSIGN_PUBLIC_IP, None)) is not None,
        ]
    ):
        network_config = prune_dict(
            {
                "awsvpcConfiguration": {
                    "subnets": str(subnets).split(",") if subnets else None,
                    "securityGroups": str(security_groups).split(",") if security_groups else None,
                    "assignPublicIp": parse_assign_public_ip(assign_public_ip, is_launch_type_ec2),
                }
            }
        )

        if "subnets" not in network_config["awsvpcConfiguration"]:
            raise ValueError("At least one subnet is required to run a task.")

        task_kwargs["networkConfiguration"] = network_config

    task_kwargs = camelize_dict_keys(task_kwargs)

    try:
        json.loads(json.dumps(task_kwargs))
    except JSONDecodeError:
        raise ValueError("AWS ECS Executor config values must be JSON serializable.")

    return task_kwargs