in source/deploy_dag.py [0:0]
def upload_to_gcs(self,gcs_file_name,source_file_path):
"""This function get Composer DAG GCS folder path and \
uploads file to GCS bucket and file path"""
"""
:param gcs_file_name: destination file name
:param source_file_path: local source file path to upload files
:return str config_dag_validation: Config DAG File validation flag Success or Fail
"""
try:
logging.info("Getting Google Cloud Composer Google Cloud Storage DAGS bucket ")
authHeaders = self.google_api_headers()
reqSession = self.createRequestSession()
environment_url = (
'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
'/environments/{}').format(self.project_id, self.location, self.gcp_composer_env_name)
response = reqSession.get(environment_url, headers=authHeaders,timeout=30)
environmentVars = response.json()
dagGcsPrefix = environmentVars['config']['dagGcsPrefix']
GcsBucket = dagGcsPrefix.split("/")[-2]
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(GcsBucket)
gcsFolderPath = str(source_file_path.split("/")[-1])+"/"+gcs_file_name
dag_blob = bucket.blob(gcsFolderPath)
dag_blob.upload_from_filename(os.path.join(source_file_path,gcs_file_name))
logging.info(f"File {gcs_file_name} uploaded to {GcsBucket}/{gcsFolderPath}.")
except Exception as error:
logging.error(" Got error while uploading file to GCS DAG folder : {}".format(str(type(error).__name__)+" --> "+ str(error)))