def lambda_handler()

in hasher-matcher-actioner/hmalib/lambdas/hashing.py [0:0]


def lambda_handler(event, context):
    """
    SQS Events generated by the submissions API or by files being added to S3.
    Downloads files to temp-storage, identifies content_type and generates
    allowed signal_types from it.

    Saves hash output to DynamoDB, sends a message on an output queue.

    Note that this brings the contents of a file into memory. This is subject to
    the resource limitation on the lambda. Potentially extendable until 10GB, but
    that would be super-expensive. [1]

    [1]: https://docs.aws.amazon.com/lambda/latest/dg/configuration-console.html
    """
    records_table = get_dynamodb().Table(DYNAMODB_TABLE)
    banks_table = BanksTable(get_dynamodb().Table(BANKS_TABLE))
    sqs_client = get_sqs_client()

    for sqs_record in event["Records"]:
        message = json.loads(sqs_record["body"])

        if message.get("Event") == "s3:TestEvent":
            continue

        media_to_process: t.List[
            t.Union[S3ImageSubmission, URLSubmissionMessage, BankSubmissionMessage]
        ] = []

        if URLSubmissionMessage.could_be(message):
            media_to_process.append(URLSubmissionMessage.from_sqs_message(message))
        elif S3ImageSubmissionBatchMessage.could_be(message):
            # S3 submissions can only be images for now.
            media_to_process.extend(
                S3ImageSubmissionBatchMessage.from_sqs_message(
                    message, image_prefix=IMAGE_PREFIX
                ).image_submissions
            )
        elif BankSubmissionMessage.could_be(message):
            media_to_process.append(BankSubmissionMessage.from_sqs_message(message))
        else:
            logger.warn(f"Unprocessable Message: {message}")

        for media in media_to_process:
            if not hasher.supports(media.content_type):
                if isinstance(media, BankSubmissionMessage):
                    object_id = media.bank_id
                else:
                    object_id = media.content_id
                logger.warn(
                    f"Unprocessable content type: {media.content_type}, id: {object_id}"
                )
                continue

            with metrics.timer(metrics.names.hasher.download_file):
                try:
                    if hasattr(media, "key") and hasattr(media, "bucket"):
                        # Classic duck-typing. If it has key and bucket, must be an
                        # S3 submission.
                        media = t.cast(S3ImageSubmission, media)
                        bytes_: bytes = S3BucketContentSource(
                            media.bucket, IMAGE_PREFIX
                        ).get_bytes(media.content_id)
                    else:
                        media = t.cast(URLSubmissionMessage, media)
                        bytes_: bytes = URLContentSource().get_bytes(media.url)
                except Exception:
                    if isinstance(media, BankSubmissionMessage):
                        object_id = media.bank_id
                    else:
                        object_id = media.content_id
                    logger.exception(
                        f"Encountered exception while trying to get_bytes for id: {object_id}. Unable to hash content."
                    )
                    continue

            for signal in hasher.get_hashes(media.content_type, bytes_):
                if isinstance(media, BankSubmissionMessage):
                    # route signals to bank datastore only.
                    bank_operations.add_bank_member_signal(
                        banks_table=banks_table,
                        bank_id=media.bank_id,
                        bank_member_id=media.bank_member_id,
                        signal_type=signal.signal_type,
                        signal_value=signal.signal_value,
                    )
                    # don't write hash records etc.
                    continue

                hash_record = PipelineHashRecord(
                    content_id=media.content_id,
                    signal_type=signal.signal_type,
                    content_hash=signal.signal_value,
                    updated_at=datetime.datetime.now(),
                )

                hasher.write_hash_record(records_table, hash_record)
                hasher.publish_hash_message(sqs_client, hash_record)

    metrics.flush()