def lambda_handler()

in DataConnectors/AWS-S3/CloudWatchLambdaFunction_V2.py [0:0]


def lambda_handler(event, context):
    """
    The function gets data from cloud watch and put it in the desired bucket in the required format for Sentinel.
    :param event: object that contains information about the current state of the execution environment.
    :param context: object that contains information about the current execution context.
    """
    unix_start_time = int(time.mktime(START_TIME_UTC.timetuple()))*1000
    unix_end_time = int(time.mktime(END_TIME_UTC.timetuple()))*1000
    log_groups = None
    log_streams = None
    log_Group_Name = ""
    log_Stream_Name = ""
    log_groups_streams_arry = []
    try:
        # List log groups
        try:
            log_groups = logs.describe_log_groups()                               
        except Exception as e:            
            print(f"An error occurred in list log groups: {e}")

        if log_groups and 'logGroups' in log_groups:
            for log_group in log_groups['logGroups']:
                log_Group_Name = log_group['logGroupName']            
                try:
                    # List log streams for each log group
                    log_streams = logs.describe_log_streams(logGroupName=log_Group_Name)                
                except Exception as e:                
                    print(f"An error occurred at logs.describe_log_streams: {e}")

                for log_stream in log_streams['logStreams']:
                    log_Stream_Name = log_stream['logStreamName']
                    logGroup_logStream_entry = {
                        "logGroupName": log_Group_Name,
                        "logStreamName": log_Stream_Name
                    }
                    log_groups_streams_arry.append(logGroup_logStream_entry) 
                    
            # Iterate through log_groups_dict
            for key in log_groups_streams_arry:                            
                try:
                    response = logs.get_log_events(
                        logGroupName = key["logGroupName"],
                        logStreamName = key["logStreamName"],
                        startTime=unix_start_time,
                        endTime=unix_end_time,
                    )
                except Exception as e:                
                    print(f"An error occurred at get_log_events: {e}")
                
                # Check if the response contains log events
                if 'events' in response:
                    log_events = response['events']
                    if log_events:
                        # Convert events to json object
                        json_string = json.dumps(log_events)
                        json_object = json.loads(json_string)

                        df = pd.DataFrame(json_object)
                        if not df.empty:
                            # Convert unix time to zulu time for example from 1671086934783 to 2022-12-15T06:48:54.783Z
                            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
                            df['timestamp'] = df['timestamp'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3]+'Z'

                            fileName = key["logStreamName"]
                            # Find the index of the closing square bracket
                            closing_bracket_index = fileName.find(']')
                            # Check if the closing square bracket is present
                            if closing_bracket_index != -1:
                                # Extract the substring after the closing square bracket
                                output_File_Name = fileName[closing_bracket_index + 1:]
                            else:
                                # Generate a random UUID
                                random_uuid = uuid.uuid4()
                                # Convert UUID to a string and remove dashes
                                output_File_Name = str(random_uuid).replace('-', '')                        
                            
                            # Check if "ingestionTime" column exists in the DataFrame
                            if "ingestionTime" in df.columns:
                                # If the column exists, drop it
                                df.drop(columns=["ingestionTime"], inplace=True)
                                fileToS3 = df
                                try:                
                                    # Export data to temporary file in the right format, which will be deleted as soon as the session ends
                                    fileToS3.to_csv( f'/tmp/{output_File_Name}.gz', index=False, header=False, compression='gzip', sep = ' ', escapechar=' ',  doublequote=False, quoting=csv.QUOTE_NONE)
                                
                                    # Upload data to desired folder in bucket
                                    s3.Bucket(BUCKET_NAME).upload_file(f'/tmp/{output_File_Name}.gz', f'{BUCKET_PREFIX}{output_File_Name}.gz')
                                except Exception as e:                
                                    print("Error exporting to S3 %s %s: %s" % (key["logGroupName"], key["logStreamName"], getattr(e, 'message', repr(e))))
                            else:
                                print(f"ingestionTime column missing in the DataFrame")        
                        else:
                            print(f"No events for specified time - startTime:%s, endTime:%s, logGroupName:%s, logStreamName:%s" % (unix_start_time, unix_end_time, key["logGroupName"], key["logStreamName"])) 
                    else:
                        print(f"No events for specified time - startTime:%s, endTime:%s, logGroupName:%s, logStreamName:%s" % (unix_start_time, unix_end_time, key["logGroupName"], key["logStreamName"]))                     
                else:
                    print(f"No log events for specified time - startTime:%s, endTime:%s, logGroupName:%s, logStreamName:%s" % (unix_start_time, unix_end_time, key["logGroupName"], key["logStreamName"])) 
    except Exception as e:
        print("Error exporting %s: %s" % (log_Group_Name, getattr(e, 'message', repr(e))))