in providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py [0:0]
def execute(self, context: Context):
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
self.log.info("Creating Redshift cluster %s", self.cluster_identifier)
params: dict[str, Any] = {}
if self.db_name:
params["DBName"] = self.db_name
if self.cluster_type:
params["ClusterType"] = self.cluster_type
if self.cluster_type == "multi-node":
params["NumberOfNodes"] = self.number_of_nodes
if self.cluster_security_groups:
params["ClusterSecurityGroups"] = self.cluster_security_groups
if self.vpc_security_group_ids:
params["VpcSecurityGroupIds"] = self.vpc_security_group_ids
if self.cluster_subnet_group_name:
params["ClusterSubnetGroupName"] = self.cluster_subnet_group_name
if self.availability_zone:
params["AvailabilityZone"] = self.availability_zone
if self.preferred_maintenance_window:
params["PreferredMaintenanceWindow"] = self.preferred_maintenance_window
if self.cluster_parameter_group_name:
params["ClusterParameterGroupName"] = self.cluster_parameter_group_name
if self.automated_snapshot_retention_period:
params["AutomatedSnapshotRetentionPeriod"] = self.automated_snapshot_retention_period
if self.manual_snapshot_retention_period:
params["ManualSnapshotRetentionPeriod"] = self.manual_snapshot_retention_period
if self.port:
params["Port"] = self.port
if self.cluster_version:
params["ClusterVersion"] = self.cluster_version
if self.allow_version_upgrade:
params["AllowVersionUpgrade"] = self.allow_version_upgrade
if self.encrypted:
params["Encrypted"] = self.encrypted
if self.hsm_client_certificate_identifier:
params["HsmClientCertificateIdentifier"] = self.hsm_client_certificate_identifier
if self.hsm_configuration_identifier:
params["HsmConfigurationIdentifier"] = self.hsm_configuration_identifier
if self.elastic_ip:
params["ElasticIp"] = self.elastic_ip
if self.tags:
params["Tags"] = self.tags
if self.kms_key_id:
params["KmsKeyId"] = self.kms_key_id
if self.enhanced_vpc_routing:
params["EnhancedVpcRouting"] = self.enhanced_vpc_routing
if self.additional_info:
params["AdditionalInfo"] = self.additional_info
if self.iam_roles:
params["IamRoles"] = self.iam_roles
if self.maintenance_track_name:
params["MaintenanceTrackName"] = self.maintenance_track_name
if self.snapshot_schedule_identifier:
params["SnapshotScheduleIdentifier"] = self.snapshot_schedule_identifier
if self.availability_zone_relocation:
params["AvailabilityZoneRelocation"] = self.availability_zone_relocation
if self.aqua_configuration_status:
params["AquaConfigurationStatus"] = self.aqua_configuration_status
if self.default_iam_role_arn:
params["DefaultIamRoleArn"] = self.default_iam_role_arn
# PubliclyAccessible is True by default on Redshift side, hence, we should always set it regardless
# of its value
params["PubliclyAccessible"] = self.publicly_accessible
cluster = redshift_hook.create_cluster(
self.cluster_identifier,
self.node_type,
self.master_username,
self.master_user_password,
params,
)
if self.deferrable:
self.defer(
trigger=RedshiftCreateClusterTrigger(
cluster_identifier=self.cluster_identifier,
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempt,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
)
if self.wait_for_completion:
redshift_hook.get_conn().get_waiter("cluster_available").wait(
ClusterIdentifier=self.cluster_identifier,
WaiterConfig={
"Delay": self.poll_interval,
"MaxAttempts": self.max_attempt,
},
)
self.log.info("Created Redshift cluster %s", self.cluster_identifier)
self.log.info(cluster)