composer/tools/composer_migrate.py (392 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2025 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Standalone script for migrating environments from Composer 2 to Composer 3."""
import argparse
import json
import math
import pprint
import subprocess
from typing import Any, Dict, List
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)
class ComposerClient:
"""Client for interacting with Composer API.
The client uses gcloud under the hood.
"""
def __init__(self, project: str, location: str, sdk_endpoint: str) -> None:
self.project = project
self.location = location
self.sdk_endpoint = sdk_endpoint
def get_environment(self, environment_name: str) -> Any:
"""Returns an environment json for a given Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer environments describe"
f" {environment_name} --project={self.project} --location={self.location} --format"
" json"
)
output = run_shell_command(command)
return json.loads(output)
def create_environment_from_config(self, config: Any) -> Any:
"""Creates a Composer environment based on the given json config."""
# Obtain access token through gcloud
access_token = run_shell_command("gcloud auth print-access-token")
# gcloud does not support creating composer environments from json, so we
# need to use the API directly.
create_environment_command = (
f"curl -s -X POST -H 'Authorization: Bearer {access_token}'"
" -H 'Content-Type: application/json'"
f" -d '{json.dumps(config)}'"
f" {self.sdk_endpoint}/v1/projects/{self.project}/locations/{self.location}/environments"
)
output = run_shell_command(create_environment_command)
logging.info("Create environment operation: %s", output)
# Poll create operation using gcloud.
operation_id = json.loads(output)["name"].split("/")[-1]
poll_operation_command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer operations wait"
f" {operation_id} --project={self.project} --location={self.location}"
)
run_shell_command(poll_operation_command)
def list_dags(self, environment_name: str) -> List[str]:
"""Returns a list of DAGs in a given Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer environments run"
f" {environment_name} --project={self.project} --location={self.location} dags"
" list -- -o json"
)
output = run_shell_command(command)
# Output may contain text from top level print statements.
# The last line of the output is always a json array of DAGs.
return json.loads(output.splitlines()[-1])
def pause_dag(
self,
dag_id: str,
environment_name: str,
) -> Any:
"""Pauses a DAG in a Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer environments run"
f" {environment_name} --project={self.project} --location={self.location} dags"
f" pause -- {dag_id}"
)
run_shell_command(command)
def unpause_dag(
self,
dag_id: str,
environment_name: str,
) -> Any:
"""Unpauses all DAGs in a Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer environments run"
f" {environment_name} --project={self.project} --location={self.location} dags"
f" unpause -- {dag_id}"
)
run_shell_command(command)
def save_snapshot(self, environment_name: str) -> str:
"""Saves a snapshot of a Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer"
" environments snapshots save"
f" {environment_name} --project={self.project}"
f" --location={self.location} --format=json"
)
output = run_shell_command(command)
return json.loads(output)["snapshotPath"]
def load_snapshot(
self,
environment_name: str,
snapshot_path: str,
) -> Any:
"""Loads a snapshot to a Composer environment."""
command = (
f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={self.sdk_endpoint} gcloud"
" composer"
f" environments snapshots load {environment_name}"
f" --snapshot-path={snapshot_path} --project={self.project}"
f" --location={self.location} --format=json"
)
run_shell_command(command)
def run_shell_command(command: str, command_input: str = None) -> str:
"""Executes shell command and returns its output."""
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
(res, _) = p.communicate(input=command_input)
output = str(res.decode().strip("\n"))
if p.returncode:
raise RuntimeError(f"Failed to run shell command: {command}, details: {output}")
return output
def get_target_cpu(source_cpu: float, max_cpu: float) -> float:
"""Returns a target CPU value for a Composer 3 workload."""
# Allowed values for Composer 3 workloads are 0.5, 1.0 and multiples of 2.0 up
# to max_cpu.
if source_cpu < 1.0:
return 0.5
if source_cpu == 1.0:
return source_cpu
return min(math.ceil(source_cpu / 2.0) * 2, max_cpu)
def get_target_memory_gb(source_memory_gb: float, target_cpu: float) -> float:
"""Returns a target memory in GB for a Composer 3 workload."""
# Allowed values for Composer 3 workloads are multiples of 0.25
# starting from 1 * cpu up to 8 * cpu, with minimum of 1 GB.
target_memory_gb = math.ceil(source_memory_gb * 4.0) / 4.0
return max(1.0, target_cpu, min(target_memory_gb, target_cpu * 8))
def get_target_storage_gb(source_storage_gb: float) -> float:
"""Returns a target storage in GB for a Composer 3 workload."""
# Composer 3 allows only whole numbers of GB for storage, up to 100 GB.
return min(math.ceil(source_storage_gb), 100.0)
def get_target_workloads_config(
source_workloads_config: Any,
) -> Dict[str, Any]:
"""Returns a Composer 3 workloads config based on the source environment."""
workloads_config = {}
if source_workloads_config.get("scheduler"):
scheduler_cpu = get_target_cpu(source_workloads_config["scheduler"]["cpu"], 1.0)
workloads_config["scheduler"] = {
"cpu": scheduler_cpu,
"memoryGb": get_target_memory_gb(
source_workloads_config["scheduler"]["memoryGb"], scheduler_cpu
),
"storageGb": get_target_storage_gb(
source_workloads_config["scheduler"]["storageGb"]
),
"count": min(source_workloads_config["scheduler"]["count"], 3),
}
# Use configuration from the Composer 2 scheduler for Composer 3
# dagProcessor.
dag_processor_cpu = get_target_cpu(
source_workloads_config["scheduler"]["cpu"], 32.0
)
workloads_config["dagProcessor"] = {
"cpu": dag_processor_cpu,
"memoryGb": get_target_memory_gb(
source_workloads_config["scheduler"]["memoryGb"], dag_processor_cpu
),
"storageGb": get_target_storage_gb(
source_workloads_config["scheduler"]["storageGb"]
),
"count": min(source_workloads_config["scheduler"]["count"], 3),
}
if source_workloads_config.get("webServer"):
web_server_cpu = get_target_cpu(
source_workloads_config["webServer"]["cpu"], 4.0
)
workloads_config["webServer"] = {
"cpu": web_server_cpu,
"memoryGb": get_target_memory_gb(
source_workloads_config["webServer"]["memoryGb"], web_server_cpu
),
"storageGb": get_target_storage_gb(
source_workloads_config["webServer"]["storageGb"]
),
}
if source_workloads_config.get("worker"):
worker_cpu = get_target_cpu(source_workloads_config["worker"]["cpu"], 32.0)
workloads_config["worker"] = {
"cpu": worker_cpu,
"memoryGb": get_target_memory_gb(
source_workloads_config["worker"]["memoryGb"], worker_cpu
),
"storageGb": get_target_storage_gb(
source_workloads_config["worker"]["storageGb"]
),
"minCount": source_workloads_config["worker"]["minCount"],
"maxCount": source_workloads_config["worker"]["maxCount"],
}
if source_workloads_config.get("triggerer"):
triggerer_cpu = get_target_cpu(source_workloads_config["triggerer"]["cpu"], 1.0)
workloads_config["triggerer"] = {
"cpu": triggerer_cpu,
"memoryGb": get_target_memory_gb(
source_workloads_config["triggerer"]["memoryGb"], triggerer_cpu
),
"count": source_workloads_config["triggerer"]["count"],
}
else:
workloads_config["triggerer"] = {
"count": 0,
}
return workloads_config
def get_target_environment_config(
target_environment_name: str,
target_airflow_version: str,
source_environment: Any,
) -> Dict[str, Any]:
"""Returns a Composer 3 environment config based on the source environment."""
# Use the same project and location as the source environment.
target_environment_name = "/".join(
source_environment["name"].split("/")[:-1] + [target_environment_name]
)
target_workloads_config = get_target_workloads_config(
source_environment["config"].get("workloadsConfig", {})
)
target_node_config = {
"network": source_environment["config"]["nodeConfig"].get("network"),
"serviceAccount": source_environment["config"]["nodeConfig"]["serviceAccount"],
"tags": source_environment["config"]["nodeConfig"].get("tags", []),
}
if "subnetwork" in source_environment["config"]["nodeConfig"]:
target_node_config["subnetwork"] = source_environment["config"]["nodeConfig"][
"subnetwork"
]
target_environment = {
"name": target_environment_name,
"labels": source_environment.get("labels", {}),
"config": {
"softwareConfig": {
"imageVersion": f"composer-3-airflow-{target_airflow_version}",
"cloudDataLineageIntegration": (
source_environment["config"]["softwareConfig"].get(
"cloudDataLineageIntegration", {}
)
),
},
"nodeConfig": target_node_config,
"privateEnvironmentConfig": {
"enablePrivateEnvironment": (
source_environment["config"]
.get("privateEnvironmentConfig", {})
.get("enablePrivateEnvironment", False)
)
},
"webServerNetworkAccessControl": source_environment["config"][
"webServerNetworkAccessControl"
],
"environmentSize": source_environment["config"]["environmentSize"],
"databaseConfig": source_environment["config"]["databaseConfig"],
"encryptionConfig": source_environment["config"]["encryptionConfig"],
"maintenanceWindow": source_environment["config"]["maintenanceWindow"],
"dataRetentionConfig": {
"airflowMetadataRetentionConfig": source_environment["config"][
"dataRetentionConfig"
]["airflowMetadataRetentionConfig"]
},
"workloadsConfig": target_workloads_config,
},
}
return target_environment
def main(
project_name: str,
location: str,
source_environment_name: str,
target_environment_name: str,
target_airflow_version: str,
sdk_endpoint: str,
dry_run: bool,
) -> int:
client = ComposerClient(
project=project_name, location=location, sdk_endpoint=sdk_endpoint
)
# 1. Get the source environment, validate whether it is eligible
# for migration and produce a Composer 3 environment config.
logger.info("STEP 1: Getting and validating the source environment...")
source_environment = client.get_environment(source_environment_name)
logger.info("Source environment:\n%s", pprint.pformat(source_environment))
image_version = source_environment["config"]["softwareConfig"]["imageVersion"]
if not image_version.startswith("composer-2"):
raise ValueError(
f"Source environment {source_environment['name']} is not a Composer 2"
f" environment. Current image version: {image_version}"
)
# 2. Create a Composer 3 environment based on the source environment
# configuration.
target_environment = get_target_environment_config(
target_environment_name, target_airflow_version, source_environment
)
logger.info(
"Composer 3 environment will be created with the following config:\n%s",
pprint.pformat(target_environment),
)
logger.warning(
"Composer 3 environnment workloads config may be different from the"
" source environment."
)
logger.warning(
"Newly created Composer 3 environment will not have set"
" 'airflowConfigOverrides', 'pypiPackages' and 'envVariables'. Those"
" fields will be set when the snapshot is loaded."
)
if dry_run:
logger.info("Dry run enabled, exiting.")
return 0
logger.info("STEP 2: Creating a Composer 3 environment...")
client.create_environment_from_config(target_environment)
target_environment = client.get_environment(target_environment_name)
logger.info(
"Composer 3 environment successfully created%s",
pprint.pformat(target_environment),
)
# 3. Pause all DAGs in the source environment
logger.info("STEP 3: Pausing all DAGs in the source environment...")
source_env_dags = client.list_dags(source_environment_name)
source_env_dag_ids = [dag["dag_id"] for dag in source_env_dags]
logger.info(
"Found %d DAGs in the source environment: %s",
len(source_env_dags),
source_env_dag_ids,
)
for dag in source_env_dags:
if dag["dag_id"] == "airflow_monitoring":
continue
if dag["is_paused"] == "True":
logger.info("DAG %s is already paused.", dag["dag_id"])
continue
logger.info("Pausing DAG %s in the source environment.", dag["dag_id"])
client.pause_dag(dag["dag_id"], source_environment_name)
logger.info("DAG %s paused.", dag["dag_id"])
logger.info("All DAGs in the source environment paused.")
# 4. Save snapshot of the source environment
logger.info("STEP 4: Saving snapshot of the source environment...")
snapshot_path = client.save_snapshot(source_environment_name)
logger.info("Snapshot saved: %s", snapshot_path)
# 5. Load the snapshot into the target environment
logger.info("STEP 5: Loading snapshot into the new environment...")
client.load_snapshot(target_environment_name, snapshot_path)
logger.info("Snapshot loaded.")
# 6. Unpase DAGs in the new environment
logger.info("STEP 6: Unpausing DAGs in the new environment...")
all_dags_present = False
# Wait until all DAGs from source environment are visible.
while not all_dags_present:
target_env_dags = client.list_dags(target_environment_name)
target_env_dag_ids = [dag["dag_id"] for dag in target_env_dags]
all_dags_present = set(source_env_dag_ids) == set(target_env_dag_ids)
logger.info("List of DAGs in the target environment: %s", target_env_dag_ids)
# Unpause only DAGs that were not paused in the source environment.
for dag in source_env_dags:
if dag["dag_id"] == "airflow_monitoring":
continue
if dag["is_paused"] == "True":
logger.info("DAG %s was paused in the source environment.", dag["dag_id"])
continue
logger.info("Unpausing DAG %s in the target environment.", dag["dag_id"])
client.unpause_dag(dag["dag_id"], target_environment_name)
logger.info("DAG %s unpaused.", dag["dag_id"])
logger.info("DAGs in the target environment unpaused.")
logger.info("Migration complete.")
return 0
def parse_arguments() -> Dict[Any, Any]:
"""Parses command line arguments."""
argument_parser = argparse.ArgumentParser(
usage="Script for migrating environments from Composer 2 to Composer 3.\n"
)
argument_parser.add_argument(
"--project",
type=str,
required=True,
help="Project name of the Composer environment to migrate.",
)
argument_parser.add_argument(
"--location",
type=str,
required=True,
help="Location of the Composer environment to migrate.",
)
argument_parser.add_argument(
"--source_environment",
type=str,
required=True,
help="Name of the Composer 2 environment to migrate.",
)
argument_parser.add_argument(
"--target_environment",
type=str,
required=True,
help="Name of the Composer 3 environment to create.",
)
argument_parser.add_argument(
"--target_airflow_version",
type=str,
default="2",
help="Airflow version for the Composer 3 environment.",
)
argument_parser.add_argument(
"--dry_run",
action="store_true",
default=False,
help=(
"If true, script will only print the config for the Composer 3"
" environment."
),
)
argument_parser.add_argument(
"--sdk_endpoint",
type=str,
default="https://composer.googleapis.com/",
required=False,
)
return argument_parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
exit(
main(
project_name=args.project,
location=args.location,
source_environment_name=args.source_environment,
target_environment_name=args.target_environment,
target_airflow_version=args.target_airflow_version,
sdk_endpoint=args.sdk_endpoint,
dry_run=args.dry_run,
)
)