def lambda_handler()

in workshops/RI2021/ml_ops/lambdas/anomaly-alert-function/anomaly-alert-function.py [0:0]


def lambda_handler(event, context):
    
    #Function to format the date given by the event
    def datetime_from_string(s):
        try:
            dt = datetime.datetime.fromisoformat(s.split("[")[0])
        except ValueError:
            dt = datetime.datetime.strptime(s.split("[")[0], "%Y-%m-%dT%H:%MZ")
        return dt
    #Function to update the metricValue_AnomalyScore csv in the case that one already exists
    def update_Anomaly_CSV(event,key,bucket,obj,response):
        print('object exist')
        #Reading the existing file
        original_df = pd.read_csv(obj.get("Body"), index_col=False)
        file2 = original_df.to_dict('list')

        #getting the needed data
        metricList = response['MetricList']
        dimensionList = response['DimensionList']
        metricName = event['impactedMetric']['metricName']
 
        #Column names generator
        data2={}
        data2['key']=[]
        data2['Timestamp'] =[]
        for i in dimensionList:
            data2[i]=[]
        #    data2[i]=[]
        for i in metricList:
            data2[i['MetricName']+'AnomalyMetricValue']=[]
            data2[i['MetricName']+'GroupScore']=[]
    
        #Data collection from the event for the CSV
        for i in event['impactedMetric']['relevantTimeSeries']:
            for a in i['dimensions']:
                data2[a['dimensionName']].append(a['dimensionValue'])
            data2[metricName+'AnomalyMetricValue'].append(i['metricValue'])
            data2[metricName+'GroupScore'].append(event['anomalyScore'])
            data2['Timestamp'].append(start_time)
            
        nRow=len(data2['Timestamp'])
        nDimension = len(dimensionList)
        
        #key generator
        i=0
        while i<nRow:
            value=''
            for a in dimensionList:
                value+=str(data2[a][i])
            value= str(data2['Timestamp'][i])+value
            data2['key'].append(value)
            i=i+1
        c=0
        #Checking if the data is  already in the original file and ammend the empty spaces and add the data  
        for n in data2['key']:
            
            if n in file2['key']:
                where=file2['key'].index(n)
                file2[metricName+'AnomalyMetricValue'][where] = data2[metricName+'AnomalyMetricValue'][c]
                file2[metricName+'GroupScore'][where] =data2[metricName+'GroupScore'][c]
            else:
                file2['key'].append(data2['key'][c])
                for i in dimensionList:
                    file2[i].append(data2[i][c])
                
                file2[metricName+'AnomalyMetricValue'].append(data2[metricName+'AnomalyMetricValue'][c])
                file2[metricName+'GroupScore'].append(data2[metricName+'GroupScore'][c])
                file2['Timestamp'].append(dateTime)
            c+=1
            
        df = pd.DataFrame.from_dict(data=file2, orient='index')
        df2 = df.transpose()
        with io.StringIO() as filename:
            df2.to_csv(filename, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
            response = s3.put_object(
                Bucket=bucket, Key=key, Body=filename.getvalue()
            )
        print('updated Anomaly csv saved')
    #If the metricValue_AnomalyScore file does not exist it will create one
    def generate_Anomaly_CSV(event,key,bucket,response):
        #getting the needed data
        metricList = response['MetricList']
        dimensionList = response['DimensionList']
        metricName = event['impactedMetric']['metricName']
        pd.options.mode.use_inf_as_na = True
        
        #Column names generator
        data2={}
        data2['key']=[]
        data2['Timestamp'] =[]
        for i in dimensionList:
            data2[i]=[]
            data2[i]=[]
        for i in metricList:
            data2[i['MetricName']+'AnomalyMetricValue']=[]
            data2[i['MetricName']+'GroupScore']=[]
    
        #Data collection for the CSV
        for i in event['impactedMetric']['relevantTimeSeries']:
            for a in i['dimensions']:
                data2[a['dimensionName']].append(a['dimensionValue'])
            data2[metricName+'AnomalyMetricValue'].append(i['metricValue'])
            data2[metricName+'GroupScore'].append(event['anomalyScore'])
            data2['Timestamp'].append(start_time)
        nRow=len(data2['Timestamp'])
        #key generator
        i=0
        while i<nRow:
            value=''
            for a in dimensionList:
                value+=str(data2[a][i])
            value= str(data2['Timestamp'][i])+value
            data2['key'].append(value)
            i+=1

        df = pd.DataFrame.from_dict(data=data2, orient='index')
        df2 = df.transpose()

        with io.StringIO() as filename:
            df2.to_csv(filename, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
            response = s3.put_object(
                Bucket=bucket, Key=key, Body=filename.getvalue()
            )
        print('Anomaly csv saved in', key)
    #Checks if the metricValue_AnomalyScore file already exists
    def Anomaly_CSV_Check(event,key,bucket,response):
        try:
            obj = s3.get_object(Bucket=bucket,Key=key) 
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code']=='404' or e.response['Error']['Code']=='NoSuchKey':
                print('the Anomaly csv file does not exist and we will generate the very first file now')
                generate_Anomaly_CSV(event,key,bucket,response)
            else:
                print('something else happened')
                print('error is', e.response)
                raise
        else:
            update_Anomaly_CSV(event,key,bucket,obj,response)
    #Updates the dimensionContributions csv file if it exists
    def update_Dimension_CSV(event,key,obj,bucket):
        print('object exist')
        original_df = pd.read_csv(obj.get("Body"), index_col=False)
        file = original_df.to_dict('list')
        #Column Titles generation
        data = {}
        data ['Timestamp'] =[]
        data['metricName'] =[]
        data['dimensionName'] =[]
        data['dimensionValue'] =[]
        data['valueContribution'] =[]
        
        #Data collection for the CSV
        for i in event['impactedMetric']['dimensionContribution']:
            for a in i['dimensionValueContributions']:
                data['Timestamp'].append(start_time)
                data['dimensionName'].append(i['dimensionName'])
                data['dimensionValue'].append(a['dimensionValue'])
                data['valueContribution'].append(a['valueContribution'])
                data['metricName'].append(event['impactedMetric']['metricName'])
          
        df=pd.DataFrame(data=data)
        df2 = pd.DataFrame(data=file)
        result = pd.concat([df2, df])

        with io.StringIO() as filename:
            result.to_csv(filename, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
            response = s3.put_object(
                Bucket=bucket, Key=key, Body=filename.getvalue()
            )
        print('updated Dimension csv saved')
    
    #Generates the dimensionContributions csv file     
    def generate_Dimension_CSV(event,key,bucket):
        #Column Titles generator
        data = {}
        data ['Timestamp'] =[]
        data['metricName'] =[]
        data['dimensionName'] =[]
        data['dimensionValue'] =[]
        data['valueContribution'] =[]
        
        #Data collection for the CSV
        for i in event['impactedMetric']['dimensionContribution']:
            for a in i['dimensionValueContributions']:
                data['Timestamp'].append(start_time)
                data['dimensionName'].append(i['dimensionName'])
                data['dimensionValue'].append(a['dimensionValue'])
                data['valueContribution'].append(a['valueContribution'])
                data['metricName'].append(event['impactedMetric']['metricName'])
          
        df=pd.DataFrame(data=data)
        print('the dimension first csv file is', )
        #CSV generation and upload to S3
        with io.StringIO() as filename:
            df.to_csv(filename, index=False, date_format='%Y-%m-%d %H:%M:%S')
            response = s3.put_object(
                Bucket=bucket, Key=key, Body=filename.getvalue()
            )
        print('the Dimension CSV has been saved in', key)
    #Checks if the dimensionContributions csv file already exists
    def Dimension_CSV_Check(event,key,bucket):
        try:
           obj = s3.get_object(Bucket=bucket,Key=key) 
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code']=='404' or e.response['Error']['Code']=='NoSuchKey':
                print('the Dimension csv file does not exist and we will generate the very first file now')
                generate_Dimension_CSV(event,key,bucket)
            else:
                print('something else happened')
                print('error is', e.response)
                raise
        else:
            update_Dimension_CSV(event,key,obj,bucket)
    
    start_time = datetime_from_string(event["timestamp"] )
    dateTime = str(start_time)
    splitdate = dateTime.split()
    
    #Initial parameters. Here you write the bucket and the ARN from the dataset of the detector (do not change the key1 and key2)
    #Write the bucket where you want the results to be located. THE BUCKET HAS TO ALREADY EXIST
    bucket = os.getenv('S3_BUCKET')
    DataSet_ARN = os.getenv('METRIC_SET_ARN')
    
    key1 = 'anomalyResults/metricValue_AnomalyScore/'+splitdate[0]+'_'+splitdate[1]+'_metricValue_AnomalyScore.csv'
    key2 = 'anomalyResults/dimensionContributions/'+splitdate[0]+'_'+splitdate[1]+'_dimensionContributions.csv'
    
    response = lookoutmetrics_client.describe_metric_set(MetricSetArn=DataSet_ARN)
    Anomaly_CSV_Check(event,key1,bucket,response)
    Dimension_CSV_Check(event,key2,bucket)