in stacks/metrics_streamer.py [0:0]
def provision_metrics_streaming_resources(self):
""" Provisions metrics streaming AWS resources. """
stream_dict = {}
for frequency in self.metric_frequencies:
kinesis_resources_frequency = KinesisConstruct(
self,
f'kinesis-streaming-resources-{frequency}',
bucket_arn = self.bucket_arn,
prefix = f'metrics/{frequency}/'+core.Aws.REGION+'/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/',
errorOutputPrefix = f'metric_errors/{frequency}/!{{timestamp:yyyy}}/!{{timestamp:MM}}/!{{timestamp:dd}}/!{{timestamp:HH}}/!{{firehose:error-output-type}}',
database='data_governance',
table=f'metrics_{frequency}'
)
stream_dict[f"KINESIS_{frequency.upper()}_STREAM_NAME"] = kinesis_resources_frequency.kinesis_stream.stream_name
stream_dict[f"KINESIS_{frequency.upper()}_STREAM_ARN"] = kinesis_resources_frequency.kinesis_stream.stream_arn
_lambda_resource = MetricStreamerConstruct(
self,
'metrics_publishing_lambda',
stream_dict
)
aws_events.Rule(
self,
id='event_rule-day',
schedule=aws_events.Schedule.expression('cron(0 0 * * ? *)'), # top of every day (midnight)
targets=[aws_events_targets.LambdaFunction(
handler=_lambda_resource.function,
event=aws_events.RuleTargetInput.from_object({'frequency': 'day'})
)]
)
aws_events.Rule(
self,
id='event_rule-hour',
schedule=aws_events.Schedule.expression('cron(0 * * * ? *)'), # top of every hour
targets=[aws_events_targets.LambdaFunction(
handler=_lambda_resource.function,
event=aws_events.RuleTargetInput.from_object({'frequency': 'hour'})
)]
)
aws_events.Rule(
self,
id='event_rule-minute',
schedule=aws_events.Schedule.expression('cron(0/1 * * * ? *)'), # top of every minute
targets=[aws_events_targets.LambdaFunction(
handler=_lambda_resource.function,
event=aws_events.RuleTargetInput.from_object({'frequency': 'minute'})
)]
)