in source/idea/idea-cluster-manager/src/ideaclustermanager/app/snapshots/apply_snapshot_merge_table/filesystems_cluster_settings_table_merger.py [0:0]
def merge(self, context: SocaContext, table_data_to_merge: List[Dict], dedup_id: str,
_merged_record_deltas: Dict[TableName, List[MergedRecordDelta]], logger: ApplySnapshotObservabilityHelper) -> Tuple[List[MergedRecordDelta], bool]:
record_deltas: List[MergedRecordDelta] = []
onboarded_filesystem_ids = self.get_list_of_onboarded_filesystem_ids(context)
accessible_filesystem_ids = self.get_list_of_accessible_filesystem_ids(context)
env_project_names = set(self.get_names_of_projects_in_env(context))
details = self.extract_filesystem_details_to_dict(table_data_to_merge)
for filesystem_name in details:
if not isinstance(details[filesystem_name], dict):
continue
# If 'provider' empty, skip applying filesystem
provider = details[filesystem_name].get("provider")
if not provider:
logger.warning(TABLE_NAME, filesystem_name, ApplyResourceStatus.SKIPPED, f"filesystem provider not mentioned for filesystem {filesystem_name}")
continue
# If filesystem of a scope other than 'project' skip applying filesystem
scope = details[filesystem_name].get('scope')
if not scope or 'project' not in scope:
logger.warning(TABLE_NAME, filesystem_name, ApplyResourceStatus.SKIPPED, f"filesystem '{filesystem_name}' not of 'project' scope")
continue
# If filesystem does not belong to the same VPC or is already onboarded, skip applying filesystem
try:
if provider == STORAGE_PROVIDER_S3_BUCKET:
filesystem_id = details[filesystem_name][provider]["bucket_arn"]
else:
filesystem_id = details[filesystem_name][provider]["file_system_id"]
if filesystem_id in onboarded_filesystem_ids:
raise exceptions.soca_exception(
error_code=errorcodes.FILESYSTEM_ALREADY_ONBOARDED,
message=f"{filesystem_id} has already been onboarded"
)
# Does not apply to S3 buckets
if filesystem_id not in accessible_filesystem_ids and provider != STORAGE_PROVIDER_S3_BUCKET:
raise exceptions.soca_exception(
error_code=errorcodes.FILESYSTEM_NOT_IN_VPC,
message=f"{filesystem_id} not part of the env's VPC thus not accessible"
)
except exceptions.SocaException as e:
# Gracefully handling cases when filesystem is already onboarded or not accessible
if e.error_code == errorcodes.FILESYSTEM_ALREADY_ONBOARDED or e.error_code == errorcodes.FILESYSTEM_NOT_IN_VPC:
logger.debug(TABLE_NAME, filesystem_name, ApplyResourceStatus.SKIPPED, f"{e.message}")
continue
else:
raise e
except Exception as e:
logger.error(TABLE_NAME, filesystem_name, ApplyResourceStatus.FAILED_APPLY, str(e))
return record_deltas, False
# Check to see if the env already has a filesystem with the same name. If so apply the filesystem with the dedup_id attached.
filesystem_with_name_already_present = False
try:
context.shared_filesystem.get_filesystem(filesystem_name)
filesystem_with_name_already_present = True
except exceptions.SocaException as e:
if e.error_code != errorcodes.NO_SHARED_FILESYSTEM_FOUND and e.error_code != errorcodes.FILESYSTEM_NOT_FOUND:
raise e
except Exception as e:
logger.error(
TABLE_NAME, filesystem_name, ApplyResourceStatus.FAILED_APPLY, {e}
)
return record_deltas, False
# FileSystem with same name exists.
# This merge will add a new filesystem by appending the RES version and dedup ID to its name instead of overriding the existing ones,
filesystem_name_to_use = (
MergeTable.unique_resource_id_generator(filesystem_name, dedup_id)
if filesystem_with_name_already_present
else filesystem_name
)
try:
if provider == STORAGE_PROVIDER_EFS:
self._onboard_efs(filesystem_name_to_use, details[filesystem_name], context)
elif provider == STORAGE_PROVIDER_FSX_NETAPP_ONTAP:
self._onboard_ontap(filesystem_name_to_use, details[filesystem_name], context)
elif provider == STORAGE_PROVIDER_FSX_LUSTRE:
self._onboard_lustre(filesystem_name_to_use, details[filesystem_name], context)
elif provider == STORAGE_PROVIDER_S3_BUCKET:
# Onboard s3 bucket generates the filesystem name
filesystem_name_to_use = self._onboard_s3_bucket(details[filesystem_name], context)
# Wait for some time for the filesystem changes to be picked up by the config listener and added to the local config tree
self._wait_for_onboarded_filesystem_to_sync_to_config_tree(filesystem_name, context)
# Does not apply to s3 bucket
if provider != STORAGE_PROVIDER_S3_BUCKET:
accessible_filesystem_ids.remove(filesystem_id)
# All the onboarded filesystem to corresponding projects. This is a soft dependency. If a project does not exist, it will be ignored. A rollback will not be triggered in this case.
self._add_filesystem_to_projects(filesystem_name_to_use, details[filesystem_name], env_project_names, context, dedup_id, logger)
except exceptions.SocaException as e:
# Gracefully handling cases when filesystem is already onboarded or not accessible
if e.error_code == errorcodes.FILESYSTEM_ALREADY_ONBOARDED or e.error_code == errorcodes.FILESYSTEM_NOT_IN_VPC:
logger.debug(TABLE_NAME, filesystem_name, ApplyResourceStatus.SKIPPED, f"{e.message}")
continue
else:
raise e
except Exception as e:
logger.error(TABLE_NAME, filesystem_name_to_use, ApplyResourceStatus.FAILED_APPLY, str(e))
return record_deltas, False
if filesystem_with_name_already_present:
logger.debug(TABLE_NAME, filesystem_name_to_use, ApplyResourceStatus.APPLIED, f"fileSystem with same name already exists. Onboarded the filesystem successfully with name {filesystem_name_to_use}")
else:
logger.debug(TABLE_NAME, filesystem_name_to_use, ApplyResourceStatus.APPLIED, f"onboarded the filesystem successfully")
record_deltas.append(
MergedRecordDelta(
original_record={},
snapshot_record={filesystem_name: details[filesystem_name]},
resolved_record={filesystem_name_to_use: details[filesystem_name]},
action_performed=MergedRecordActionType.CREATE
)
)
return record_deltas, True