in stacks/metrics_streamer.py [0:0]
def provision_business_metrics_producing_resources(self):
""" Produces business metrics resources. """
# Artifact bucket
artifact_bucket = aws_s3.Bucket(
self,
'artifact',
bucket_name=f'data-governance-atrifact-{core.Aws.REGION}-{ACCOUNT_NUMBER}',
versioned=True,
removal_policy=core.RemovalPolicy.DESTROY,
auto_delete_objects=True
)
# Glue temp bucket
glue_temp_bucket = aws_s3.Bucket(
self,
'glue-temp',
bucket_name=f'data-governance-glue-{core.Aws.REGION}-{ACCOUNT_NUMBER}',
versioned=True,
removal_policy=core.RemovalPolicy.DESTROY,
auto_delete_objects=True
)
if not os.path.exists('cdk.out/glue'):
os.makedirs('cdk.out/glue')
with urllib.request.urlopen('https://s3.us-west-2.amazonaws.com/crawler-public/json/serde/json-serde.jar') as f:
jsonserde = f.read()
with open('cdk.out/glue/json-serde.jar','wb') as output:
output.write(jsonserde)
# Data Governance zip
data_governance = zipfile.ZipFile('cdk.out/glue/dataquality.zip', 'w')
for root, _dirs, files in os.walk("dataquality"):
for filename in files:
if '__pycache__' not in root:
path = os.path.join(root, filename)
data_governance.write(path, path)
data_governance.close()
# Definitions zip
definitions = zipfile.ZipFile('cdk.out/glue/definitions.zip', 'w')
for root, _dirs, files in os.walk("definitions"):
for filename in files:
if '__pycache__' not in root:
path = os.path.join(root, filename)
definitions.write(path, path)
definitions.close()
# accounts zip
accounts = zipfile.ZipFile('cdk.out/glue/accounts.zip', 'w')
for root, _dirs, files in os.walk("accounts"):
for filename in files:
if '__pycache__' not in root:
path = os.path.join(root, filename)
accounts.write(path, path)
accounts.close()
# Artifacts Deployment
prefix = 'glue/'
aws_s3_deployment.BucketDeployment(
self, 's3DeployExample',
sources=[
aws_s3_deployment.Source.asset('glue/'),
aws_s3_deployment.Source.asset('cdk.out/glue/')
],
destination_bucket=artifact_bucket,
destination_key_prefix=prefix
)
for metric_set in definition.metric_sets:
for metric in metric_set.metrics:
if isinstance(metric, BusinessMetric):
GlueJobConstruct(
self,
f'data-gov-{metric_set.name}',
artifact_bucket_name=artifact_bucket.bucket_name,
glue_temp_bucket_name=glue_temp_bucket.bucket_name,
script_key='glue/business_metrics.py',
max_concurrent_runs=1,
schedule=metric_set.schedule,
arguments={
"--extra-py-files": f's3://{artifact_bucket.bucket_name}/glue/definitions.zip,s3://{artifact_bucket.bucket_name}/glue/dataquality.zip,s3://{artifact_bucket.bucket_name}/glue/accounts.zip',
"--extra-jars": f's3://{artifact_bucket.bucket_name}/glue/json-serde.jar',
"--TempDir": f's3://{glue_temp_bucket.bucket_name}',
"--account_number": ACCOUNT_NUMBER,
"--metric_set_name": metric_set.name,
"--enable-glue-datacatalog": ""
}
)
break