def automation_loop()

in source/idea/idea-cluster-manager/src/ideaclustermanager/app/accounts/ad_automation_agent.py [0:0]


    def automation_loop(self):

        while not self._stop_event.is_set():

            admin_user_ok = False

            try:
                admin_user_ok = True

                visibility_timeout = self.context.config().get_int('directoryservice.ad_automation.sqs_visibility_timeout_seconds', default=30)
                result = self.context.aws().sqs().receive_message(
                    QueueUrl=self.ad_automation_sqs_queue_url,
                    MaxNumberOfMessages=DEFAULT_MAX_MESSAGES,
                    WaitTimeSeconds=DEFAULT_WAIT_INTERVAL_SECONDS,
                    AttributeNames=['SenderId'],
                    VisibilityTimeout=visibility_timeout
                )

                sqs_messages = Utils.get_value_as_list('Messages', result, [])

                delete_messages = []

                def add_to_delete(sqs_message_: Dict):
                    delete_messages.append({
                        'Id': sqs_message_['MessageId'],
                        'ReceiptHandle': sqs_message_['ReceiptHandle']
                    })

                for sqs_message in sqs_messages:
                    try:

                        message_body = Utils.get_value_as_string('Body', sqs_message)

                        request = Utils.from_json(message_body)
                        header = Utils.get_value_as_dict('header', request)
                        namespace = Utils.get_value_as_string('namespace', header)

                        # todo - constants for the namespaces supported
                        ad_automation_namespaces = {'ADAutomation.PresetComputer', 'ADAutomation.UpdateComputerDescription', 'ADAutomation.DeleteComputer'}
                        if namespace not in ad_automation_namespaces:
                            self.logger.error(f'Invalid request: namespace {namespace} not supported. Supported namespaces: {ad_automation_namespaces}')
                            add_to_delete(sqs_message)
                            continue

                        attributes = Utils.get_value_as_dict('Attributes', sqs_message, {})
                        sender_id = Utils.get_value_as_string('SenderId', attributes)

                        self.logger.info(f'Processing AD automation event: {namespace}')

                        try:

                            if namespace == 'ADAutomation.PresetComputer':
                                PresetComputeHelper(
                                    context=self.context,
                                    ad_automation_dao=self.ad_automation_dao,
                                    sender_id=sender_id,
                                    request=request
                                ).invoke()
                            elif namespace == 'ADAutomation.DeleteComputer':
                                self.logger.debug('Processing AD automation event: DeleteComputer')
                            elif namespace == 'ADAutomation.UpdateComputerDescription':
                                self.logger.debug('Processing AD automation event: UpdateComputerDescription')

                            # no exception, AD automation succeeded. delete from queue.
                            add_to_delete(sqs_message)

                        except exceptions.SocaException as e:
                            if e.error_code == errorcodes.AD_AUTOMATION_PRESET_COMPUTER_FAILED:
                                self.logger.error(f'{e}')
                                add_to_delete(sqs_message)
                            elif e.error_code == errorcodes.AD_AUTOMATION_PRESET_COMPUTER_RETRY:
                                # do nothing. request will be retried after visibility timeout interval.
                                self.logger.warning(f'{e} - request will be retried in {visibility_timeout} seconds')
                            else:
                                # retry on any unhandled exception.
                                raise e

                    except Exception as e:
                        self.logger.exception(f'failed to process sqs message: {e}. payload: {Utils.to_json(sqs_message)}. processing will be retried in {visibility_timeout} seconds ...')

                if len(delete_messages) > 0:
                    delete_message_result = self.context.aws().sqs().delete_message_batch(
                        QueueUrl=self.ad_automation_sqs_queue_url,
                        Entries=delete_messages
                    )
                    failed = Utils.get_value_as_list('Failed', delete_message_result, [])
                    if len(failed) > 0:
                        self.logger.error(f'Failed to delete AD automation entries. This could result in an infinite loop. Consider increasing the directoryservice.ad_automation.sqs_visibility_timeout_seconds. failed messages: {failed}')

            except KeyboardInterrupt:
                pass
            except Exception as e:
                self.logger.exception(f'ad automation failure: {e}')
            finally:
                # wait only if admin user is not OK and keep retrying.
                # if admin user and/or credentials are ok, wait will be handled by sqs receive message long polling
                if not admin_user_ok:
                    self._stop_event.wait(DEFAULT_WAIT_INTERVAL_SECONDS)