def get_callback_source_for_statement_name()

in lambda/python/rs_integration_function/ddb/ddb_state_table.py [0:0]


    def get_callback_source_for_statement_name(cls, statement_name: StatementName) -> CallbackInterface:
        """
        This is a very efficient DDB lookup which will only consume 1 RCU.
        Args:
            statement_name:

        Returns:
            The task token that requested issuing of this statement.
        """
        response = ddb_state_table.get_item(
            Key={
                DDB_ID: statement_name.execution_arn,
                DDB_INVOCATION_ID: statement_name.invocation_id,
            },
            AttributesToGet=[
                DDB_CALLBACK_DETAILS,
            ],
            ConsistentRead=True,
            ReturnConsumedCapacity='NONE',
        )
        logger.debug({
            l_statement_name: statement_name,
            l_response: response
        })
        try:
            return CallbackSourceBuilder.get_callback_object_for_event(
                json.loads(response['Item'][DDB_CALLBACK_DETAILS])
            )
        except KeyError as ke:
            raise NoTrackedState(f"No state for {statement_name}") from ke