in source/deploy_dag.py [0:0]
def deploy_dag(self):
"""This function gets the GCS path for composer dags, \
validates the file path for dag and uploads it to GCS Composer DAGS folder """
dag_filename,dag_directory_path,dag_file_validation,\
tasks_variables_filename,tasks_variables_directory_path,tasks_variables_file_validation = self.validate_file_path()
if dag_file_validation == "success" and tasks_variables_file_validation == "success":
self.upload_to_gcs(dag_filename,dag_directory_path)
self.upload_to_gcs(tasks_variables_filename,tasks_variables_directory_path)
elif dag_file_validation == "success" and tasks_variables_file_validation == "pass":
self.upload_to_gcs(dag_filename,dag_directory_path)
else:
logging.error("Check file validation as dag_file_validation or tasks_variables_file_validation failed ")