def upload_to_gcs()

in source/deploy_dag.py [0:0]


    def upload_to_gcs(self,gcs_file_name,source_file_path):
        """This function get Composer DAG GCS folder path and \
            uploads file to GCS bucket and file path"""
        """
        :param gcs_file_name: destination file name
        :param source_file_path: local source file path to upload files
        :return str config_dag_validation: Config DAG File validation flag Success or Fail
        """
        try:
            logging.info("Getting Google Cloud Composer Google Cloud Storage DAGS bucket ")
            authHeaders = self.google_api_headers()
            reqSession = self.createRequestSession()
            environment_url = (
                'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
                '/environments/{}').format(self.project_id, self.location, self.gcp_composer_env_name)
            response = reqSession.get(environment_url, headers=authHeaders,timeout=30)
            environmentVars = response.json()
            dagGcsPrefix = environmentVars['config']['dagGcsPrefix']
            GcsBucket = dagGcsPrefix.split("/")[-2]
            client = storage.Client(project=self.project_id)
            bucket = client.get_bucket(GcsBucket)
            gcsFolderPath = str(source_file_path.split("/")[-1])+"/"+gcs_file_name
            dag_blob = bucket.blob(gcsFolderPath)
            dag_blob.upload_from_filename(os.path.join(source_file_path,gcs_file_name))
            logging.info(f"File {gcs_file_name} uploaded to {GcsBucket}/{gcsFolderPath}.")
        except Exception as error:
            logging.error(" Got error while uploading file to GCS DAG folder : {}".format(str(type(error).__name__)+" --> "+ str(error)))