in redshift_benchmark/redshiftBenchmarkStack.py [0:0]
def __init__(self, scope: core.Construct, id: str
,dbname:str
,password:str
,username:str
,host:str
,port:int
#,cluster_id:str
,s3_bucket:str
#,tpcds_root_path:str # root path for tpcds for example s3://redshift-downloads/TPC-DS/2.13/3TB/
,rs_role_arn:str
#,num_runs:int
#,num_files:int
#,parallel_level:int
,**kwargs) -> None:
super().__init__(scope, id, **kwargs)
tpcds_data_path = core.CfnParameter(self, "tpcdsDataPath", type="String",default="s3://redshift-downloads/TPC-DS/2.13/3TB/",description="S3 path root of TPC-DS dataset")
parallel_level = core.CfnParameter(self, "parallel", type="Number",default=10,description="Number of concurrent queries submitted at the same time")
num_runs = core.CfnParameter(self, "numRuns", type="Number",default=2,description="Total runs of the test")
num_files = core.CfnParameter(self, "numFiles", type="Number",default=99,description="Total number of files under tpcds_queries directory")
# create role for Glue jobs, in prod this could be a bring in value or create by initiation
policy_statement = iam.PolicyStatement(
actions=["logs:*","s3:*","ec2:*","iam:*","cloudwatch:*","dynamodb:*","glue:*","redshift:*","redshift-data:*"]
)
policy_statement.add_all_resources()
self.glue_job_role = iam.Role(
self,
"Glue-Job-Role-Demo",
assumed_by=iam.ServicePrincipal("glue.amazonaws.com")
)
self.glue_job_role.add_to_policy(policy_statement)
self.dbname=dbname
self.username=username
#self.cluster_id=cluster_id
self.s3_bucket=s3_bucket
self.password=password
self.host=host
self.port=port
self.num_runs=num_runs.value_as_number
self.parallel_level=parallel_level.value_as_number
self.num_files=num_files.value_as_number
############################### Create Glue jobs#############################################
ddl_task = self.rs_sql_task("tpcds-benchmark-create-tables", "01-create-tables.sql")
load_task = self.rs_sql_task("tpcds-benchmark-load-tables","02-load-tables.sql"
,parameters={'tpcds_root_path':tpcds_data_path.value_as_string,'role_arn':rs_role_arn})
sequential_query_task = self.run_benchmark_query("tpcds-benchmark-sequential-report","sequential")
concurrent_query_task = self.run_benchmark_query("tpcds-benchmark-concurrent-report","concurrent")
################################## Create workflow #########################################
redshift_benchmark=glue.CfnWorkflow(self, "redshift-benchmark-pipeline"
, description="Use TPCDS benchmark Redshift"
, name="redshift-benchmark"
, tags={"project":"redshift-benchmark"})
############### Define pipeline dag by creating trigger and add to workflow#################
start=glue.CfnTrigger(self, "start-trigger"
, actions=[glue.CfnTrigger.ActionProperty(job_name=ddl_task.name)]
, type="ON_DEMAND" # should be scheduled
, description="Start the benchmark process"
, name="start-redshift-benchmark"
, tags={"project":"redshift-benchmark"}
#, schedule=cron(15 12 * * ? *)
, workflow_name=redshift_benchmark.name)
load_data=glue.CfnTrigger(self, "load-trigger"
, actions=[glue.CfnTrigger.ActionProperty(job_name=load_task.name)]
, type="CONDITIONAL"
, description="Copy data to Redshift table from S3"
, name="load-trigger"
, start_on_creation=True
, predicate=
{
"conditions": [
{
"logicalOperator": "EQUALS",
"jobName": ddl_task.name,
"state": "SUCCEEDED",
}
],
"logical": "ANY",
}
, tags={"project":"redshift-benchmark"}
, workflow_name=redshift_benchmark.name)
sequential = glue.CfnTrigger(self, "sequential-trigger"
, actions=[glue.CfnTrigger.ActionProperty(job_name=sequential_query_task.name)]
, type="CONDITIONAL"
, description="Perform sequantial run of queries"
, name="sequential-trigger"
, start_on_creation=True
, predicate=
{
"conditions": [
{
"logicalOperator": "EQUALS",
"jobName": load_task.name,
"state": "SUCCEEDED",
}
],
"logical": "ANY",
}
, tags={"project":"redshift-benchmark"}
, workflow_name=redshift_benchmark.name)
concurrent=glue.CfnTrigger(self, "concurrent-trigger"
, actions=[glue.CfnTrigger.ActionProperty(job_name=concurrent_query_task.name)]
, type="CONDITIONAL"
, description="Perform concurrent run of queries"
, name="concurrent-trigger"
, start_on_creation=True
, predicate=
{
"conditions": [
{
"logicalOperator": "EQUALS",
"jobName": sequential_query_task.name,
"state": "SUCCEEDED",
}
],
"logical": "ANY",
}
, tags={"project":"redshift-benchmark"}
, workflow_name=redshift_benchmark.name)
# Add dependency check, resources are created symontinously
start.add_depends_on(ddl_task)
load_data.add_depends_on(load_task)
load_data.add_depends_on(ddl_task)
sequential.add_depends_on(load_task)
sequential.add_depends_on(sequential_query_task)
concurrent.add_depends_on(sequential_query_task)
concurrent.add_depends_on(concurrent_query_task)