in composer_migration/src/composer_migration/__main__.py [0:0]
def run_checks(gcp_project: str, composer_environment: str, location: str) -> None:
# get bucket where dags are stored with the Composer client library
try:
composer_env_info = service_v1.EnvironmentsClient().get_environment(
request=service_v1.GetEnvironmentRequest(
name=f"projects/{gcp_project}/locations/{location}/environments/{composer_environment}"
)
)
logging.debug(composer_env_info)
dags_bucket = composer_env_info.config.dag_gcs_prefix
logging.debug(dags_bucket)
except InvalidArgument:
raise
# store dags locally in a temp directory
temp_dag_dir = tempfile.mkdtemp()
try:
# Use the gsutil -m flag to parallelize the cp process
subprocess.check_call(
["gsutil", "-m", "cp", "-r", f"{dags_bucket}/*.py", temp_dag_dir]
)
# remove Airflow monitoring dag - this is not user maintained
os.remove(f"{temp_dag_dir}/airflow_monitoring.py")
except subprocess.CalledProcessError:
raise
comparator: DAGsComparator = DAGsComparator(temp_dag_dir)
comparator.check_dag_files(strategy=CheckKubernetesPodOperator)
comparator.present_to_cli()