parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py (44 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from parquet_cli.audit_tool import audit
import os
import json
import logging
import traceback
from io import BytesIO
from datetime import datetime, timezone
import boto3
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
# )
# Build process:
# 1: cd to incubator-sdap-in-situ-data-services/parquet_flask/cdms_lambda_func/audit_tool
# 2: mkdir package
# 3: rename incubator-sdap-in-situ-data-services/setup_lambda.py to incubator-sdap-in-situ-data-services/setup.py
# 4: pip install --target ./package ../../..
# 5: cd package
# 6: zip -r ../audit_lambda.zip .
# 7: Upload zip to lambda
# 8: Clean up
def execute_code(event, context):
state = None
s3 = boto3.client('s3')
if event:
if isinstance(event, str):
event = json.loads(event)
if 'State' in event:
buf = BytesIO()
s3.download_fileobj(event['State']['Bucket'], event['State']['Key'], buf)
buf.seek(0)
state = json.load(buf)
print('Loaded persisted state from S3', flush=True)
if state is None:
try:
buf = BytesIO()
s3.download_fileobj(os.getenv('OPENSEARCH_BUCKET'), 'AUDIT_STATE.json', buf)
buf.seek(0)
state = json.load(buf)
print('Loaded persisted state from S3', flush=True)
except:
print('Could not load state, starting from scratch')
print(traceback.print_exc())
state = {}
if 'lastListTime' not in state:
state['lastListTime'] = datetime(1, 1, 1, tzinfo=timezone.utc)
else:
state['lastListTime'] = datetime.strptime(state['lastListTime'], "%Y-%m-%dT%H:%M:%S%z")
print('INVOKING AUDIT TOOL', flush=True)
print('', flush=True)
audit(
'mock-s3',
os.getenv('SQS_URL'),
os.getenv('SNS_ARN'),
state=state,
lambda_ctx=context
)