usecases/image-processing/dags/2.0/image_processing.py [34:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
args = {
    'owner': 'airflow',
}
def face_detection(ds, **kwargs): 
    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf
    print(arg)
    print(kwargs['dag_run'])
    try:
        response = client.detect_faces(
            Image={
                'S3Object': {
                    'Bucket': arg['s3Bucket'],
                    'Name': arg['s3Key'],
                }
            },
            Attributes=['ALL']
        )
        print(response)
        if len(response['FaceDetails']) != 1:
            return "photo_not_meet_requirement"
        if response['FaceDetails'][0]['Sunglasses']['Value']:
            return "photo_not_meet_requirement"

        kwargs['ti'].xcom_push(key="FaceDetails", value=response['FaceDetails'][0])
        return "check_duplicate"
    except Exception as e:
        print(e)
        return "failure"

def check_duplicate(ds, **kwargs): 

    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf
    try:
        response = client.search_faces_by_image(
        
            CollectionId=arg['RekognitionCollectionId'],
            Image={
                "S3Object": {
                    "Bucket": arg['s3Bucket'],
                    "Name": arg['s3Key']
                }
            },
            FaceMatchThreshold=70.0,
            MaxFaces=3
            
        )
        print(response)
        if len(response['FaceMatches']) > 0: #Face already exist
            return "duplicate_face"
        return "parallel_processing"
        # kwargs['ti'].xcom_push(key="FaceDetails", value=response.FaceDetails[0])
    except  Exception as e: 
        print(e)
        return "failure"

def create_thumbnail(ds, **kwargs): 

    hook = AwsLambdaHook('LAMBDA_FN_NAME', #LAMBDA_FN_NAME
                            log_type='None',qualifier='$LATEST',
                            invocation_type='RequestResponse',
                            config=None,aws_conn_id='aws_default')

    response_1 = hook.invoke_lambda(payload=json.dumps(kwargs['dag_run'].conf))
    payload = json.loads(response_1['Payload'].read().decode())
    kwargs['ti'].xcom_push(key="ThumbnailDetails", value=payload)

def add_face_index(ds, **kwargs): 
    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf

    
    response = client.index_faces(
        CollectionId=arg['RekognitionCollectionId'],
        DetectionAttributes=['ALL'],
        ExternalImageId=arg['userId'],
        Image={
            "S3Object": {
                "Bucket": arg['s3Bucket'],
                "Name": arg['s3Key']
            }
        }

    )
    print(response['FaceRecords'][0])
    kwargs['ti'].xcom_push(key="FaceIndexDetails", value=response['FaceRecords'][0]['Face'])


def persist_data( **kwargs): 
    hook = AwsDynamoDBHook(table_name="TABLE_NAME", #TABLE_NAME
                            aws_conn_id='aws_default')
    faceIndexDetails = kwargs['ti'].xcom_pull(key='FaceIndexDetails')
    thumbnailDetails = kwargs['ti'].xcom_pull(key='ThumbnailDetails')
    conf = kwargs['dag_run'].conf
    dynamoItem = {
        "UserId" : conf["userId"],
        "s3Bucket" : conf["s3Bucket"],
        "s3Key": conf["s3Key"],
        "faceId" :faceIndexDetails['FaceId'],
        "thumbnail": thumbnailDetails['thumbnail']    
    }
    items = [dynamoItem]
    hook.write_batch_data(items)


dag_args = {
    'owner': 'simple airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)}
dag =  DAG(
    dag_id='image_processing',
    start_date=days_ago(2),
    default_args=dag_args,
    end_date=None,
    schedule_interval=None,
    # schedule_interval='0 9 * * *',
    tags=['lambda','imageprocessing'])

# arg = json.dumps(kwargs['dag_run'].conf
# print(arg)
face_detection = BranchPythonOperator(
    depends_on_past=False,
    task_id='face_detection',
    python_callable=face_detection,
    provide_context=True,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



usecases/image-processing/dags/image_processing.py [34:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
args = {
    'owner': 'airflow',
}
def face_detection(ds, **kwargs): 
    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf
    print(arg)
    print(kwargs['dag_run'])
    try:
        response = client.detect_faces(
            Image={
                'S3Object': {
                    'Bucket': arg['s3Bucket'],
                    'Name': arg['s3Key'],
                }
            },
            Attributes=['ALL']
        )
        print(response)
        if len(response['FaceDetails']) != 1:
            return "photo_not_meet_requirement"
        if response['FaceDetails'][0]['Sunglasses']['Value']:
            return "photo_not_meet_requirement"

        kwargs['ti'].xcom_push(key="FaceDetails", value=response['FaceDetails'][0])
        return "check_duplicate"
    except Exception as e:
        print(e)
        return "failure"

def check_duplicate(ds, **kwargs): 

    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf
    try:
        response = client.search_faces_by_image(
        
            CollectionId=arg['RekognitionCollectionId'],
            Image={
                "S3Object": {
                    "Bucket": arg['s3Bucket'],
                    "Name": arg['s3Key']
                }
            },
            FaceMatchThreshold=70.0,
            MaxFaces=3
            
        )
        print(response)
        if len(response['FaceMatches']) > 0: #Face already exist
            return "duplicate_face"
        return "parallel_processing"
        # kwargs['ti'].xcom_push(key="FaceDetails", value=response.FaceDetails[0])
    except  Exception as e: 
        print(e)
        return "failure"

def create_thumbnail(ds, **kwargs): 

    hook = AwsLambdaHook('LAMBDA_FN_NAME', #LAMBDA_FN_NAME
                            log_type='None',qualifier='$LATEST',
                            invocation_type='RequestResponse',
                            config=None,aws_conn_id='aws_default')

    response_1 = hook.invoke_lambda(payload=json.dumps(kwargs['dag_run'].conf))
    payload = json.loads(response_1['Payload'].read().decode())
    kwargs['ti'].xcom_push(key="ThumbnailDetails", value=payload)

def add_face_index(ds, **kwargs): 
    client = boto3.client('rekognition')
    arg = kwargs['dag_run'].conf

    
    response = client.index_faces(
        CollectionId=arg['RekognitionCollectionId'],
        DetectionAttributes=['ALL'],
        ExternalImageId=arg['userId'],
        Image={
            "S3Object": {
                "Bucket": arg['s3Bucket'],
                "Name": arg['s3Key']
            }
        }

    )
    print(response['FaceRecords'][0])
    kwargs['ti'].xcom_push(key="FaceIndexDetails", value=response['FaceRecords'][0]['Face'])


def persist_data( **kwargs): 
    hook = AwsDynamoDBHook(table_name="TABLE_NAME", #TABLE_NAME
                            aws_conn_id='aws_default')
    faceIndexDetails = kwargs['ti'].xcom_pull(key='FaceIndexDetails')
    thumbnailDetails = kwargs['ti'].xcom_pull(key='ThumbnailDetails')
    conf = kwargs['dag_run'].conf
    dynamoItem = {
        "UserId" : conf["userId"],
        "s3Bucket" : conf["s3Bucket"],
        "s3Key": conf["s3Key"],
        "faceId" :faceIndexDetails['FaceId'],
        "thumbnail": thumbnailDetails['thumbnail']    
    }
    items = [dynamoItem]
    hook.write_batch_data(items)


dag_args = {
    'owner': 'simple airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)}
dag =  DAG(
    dag_id='image_processing',
    start_date=days_ago(2),
    default_args=dag_args,
    end_date=None,
    schedule_interval=None,
    # schedule_interval='0 9 * * *',
    tags=['lambda','imageprocessing'])

# arg = json.dumps(kwargs['dag_run'].conf
# print(arg)
face_detection = BranchPythonOperator(
    depends_on_past=False,
    task_id='face_detection',
    python_callable=face_detection,
    provide_context=True,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



