in static/code/worker-safety/deeplens-lambda/greengrasssdk/Lambda.py [0:0]
def invoke(self, **kwargs):
# FunctionName is a required parameter
if 'FunctionName' not in kwargs:
raise ValueError(
'"FunctionName" argument of Lambda.Client.invoke is a required argument but was not provided.'
)
arn_fields = FunctionArnFields(kwargs['FunctionName'])
arn_qualifier = arn_fields.qualifier
# A Function qualifier can be provided as part of the ARN in FunctionName, or it can be provided here. The
# behavior of the cloud is to throw an exception if both are specified but not equal
extraneous_qualifier = kwargs.get('Qualifier', '')
if extraneous_qualifier and arn_qualifier and arn_qualifier != extraneous_qualifier:
raise ValueError('The derived qualifier from the function name does not match the specified qualifier.')
final_qualifier = arn_qualifier if arn_qualifier else extraneous_qualifier
function_arn = FunctionArnFields.build_arn_string(
arn_fields.region, arn_fields.account_id, arn_fields.name, final_qualifier
)
# ClientContext must be base64 if given, but is an option parameter
try:
client_context = kwargs.get('ClientContext', b'').decode()
except AttributeError as e:
customer_logger.exception(e)
raise ValueError(
'"ClientContext" argument must be a byte string or support a decode method which returns a string'
)
if client_context:
if not re.match(valid_base64_regex, client_context):
raise ValueError('"ClientContext" argument of Lambda.Client.invoke must be base64 encoded.')
# Payload is an optional parameter
payload = kwargs.get('Payload', b'')
invocation_type = kwargs.get('InvocationType', 'RequestResponse')
customer_logger.info('Invoking local lambda "{}" with payload "{}" and client context "{}"'.format(
function_arn, payload, client_context))
# Post the work to IPC and return the result of that work
return self._invoke_internal(function_arn, payload, client_context, invocation_type)