in hasher-matcher-actioner/hmalib/lambdas/hashing.py [0:0]
def lambda_handler(event, context):
"""
SQS Events generated by the submissions API or by files being added to S3.
Downloads files to temp-storage, identifies content_type and generates
allowed signal_types from it.
Saves hash output to DynamoDB, sends a message on an output queue.
Note that this brings the contents of a file into memory. This is subject to
the resource limitation on the lambda. Potentially extendable until 10GB, but
that would be super-expensive. [1]
[1]: https://docs.aws.amazon.com/lambda/latest/dg/configuration-console.html
"""
records_table = get_dynamodb().Table(DYNAMODB_TABLE)
banks_table = BanksTable(get_dynamodb().Table(BANKS_TABLE))
sqs_client = get_sqs_client()
for sqs_record in event["Records"]:
message = json.loads(sqs_record["body"])
if message.get("Event") == "s3:TestEvent":
continue
media_to_process: t.List[
t.Union[S3ImageSubmission, URLSubmissionMessage, BankSubmissionMessage]
] = []
if URLSubmissionMessage.could_be(message):
media_to_process.append(URLSubmissionMessage.from_sqs_message(message))
elif S3ImageSubmissionBatchMessage.could_be(message):
# S3 submissions can only be images for now.
media_to_process.extend(
S3ImageSubmissionBatchMessage.from_sqs_message(
message, image_prefix=IMAGE_PREFIX
).image_submissions
)
elif BankSubmissionMessage.could_be(message):
media_to_process.append(BankSubmissionMessage.from_sqs_message(message))
else:
logger.warn(f"Unprocessable Message: {message}")
for media in media_to_process:
if not hasher.supports(media.content_type):
if isinstance(media, BankSubmissionMessage):
object_id = media.bank_id
else:
object_id = media.content_id
logger.warn(
f"Unprocessable content type: {media.content_type}, id: {object_id}"
)
continue
with metrics.timer(metrics.names.hasher.download_file):
try:
if hasattr(media, "key") and hasattr(media, "bucket"):
# Classic duck-typing. If it has key and bucket, must be an
# S3 submission.
media = t.cast(S3ImageSubmission, media)
bytes_: bytes = S3BucketContentSource(
media.bucket, IMAGE_PREFIX
).get_bytes(media.content_id)
else:
media = t.cast(URLSubmissionMessage, media)
bytes_: bytes = URLContentSource().get_bytes(media.url)
except Exception:
if isinstance(media, BankSubmissionMessage):
object_id = media.bank_id
else:
object_id = media.content_id
logger.exception(
f"Encountered exception while trying to get_bytes for id: {object_id}. Unable to hash content."
)
continue
for signal in hasher.get_hashes(media.content_type, bytes_):
if isinstance(media, BankSubmissionMessage):
# route signals to bank datastore only.
bank_operations.add_bank_member_signal(
banks_table=banks_table,
bank_id=media.bank_id,
bank_member_id=media.bank_member_id,
signal_type=signal.signal_type,
signal_value=signal.signal_value,
)
# don't write hash records etc.
continue
hash_record = PipelineHashRecord(
content_id=media.content_id,
signal_type=signal.signal_type,
content_hash=signal.signal_value,
updated_at=datetime.datetime.now(),
)
hasher.write_hash_record(records_table, hash_record)
hasher.publish_hash_message(sqs_client, hash_record)
metrics.flush()