in source/idea/idea-cluster-manager/src/ideaclustermanager/app/accounts/ad_automation_agent.py [0:0]
def automation_loop(self):
while not self._stop_event.is_set():
admin_user_ok = False
try:
admin_user_ok = True
visibility_timeout = self.context.config().get_int('directoryservice.ad_automation.sqs_visibility_timeout_seconds', default=30)
result = self.context.aws().sqs().receive_message(
QueueUrl=self.ad_automation_sqs_queue_url,
MaxNumberOfMessages=DEFAULT_MAX_MESSAGES,
WaitTimeSeconds=DEFAULT_WAIT_INTERVAL_SECONDS,
AttributeNames=['SenderId'],
VisibilityTimeout=visibility_timeout
)
sqs_messages = Utils.get_value_as_list('Messages', result, [])
delete_messages = []
def add_to_delete(sqs_message_: Dict):
delete_messages.append({
'Id': sqs_message_['MessageId'],
'ReceiptHandle': sqs_message_['ReceiptHandle']
})
for sqs_message in sqs_messages:
try:
message_body = Utils.get_value_as_string('Body', sqs_message)
request = Utils.from_json(message_body)
header = Utils.get_value_as_dict('header', request)
namespace = Utils.get_value_as_string('namespace', header)
# todo - constants for the namespaces supported
ad_automation_namespaces = {'ADAutomation.PresetComputer', 'ADAutomation.UpdateComputerDescription', 'ADAutomation.DeleteComputer'}
if namespace not in ad_automation_namespaces:
self.logger.error(f'Invalid request: namespace {namespace} not supported. Supported namespaces: {ad_automation_namespaces}')
add_to_delete(sqs_message)
continue
attributes = Utils.get_value_as_dict('Attributes', sqs_message, {})
sender_id = Utils.get_value_as_string('SenderId', attributes)
self.logger.info(f'Processing AD automation event: {namespace}')
try:
if namespace == 'ADAutomation.PresetComputer':
PresetComputeHelper(
context=self.context,
ad_automation_dao=self.ad_automation_dao,
sender_id=sender_id,
request=request
).invoke()
elif namespace == 'ADAutomation.DeleteComputer':
self.logger.debug('Processing AD automation event: DeleteComputer')
elif namespace == 'ADAutomation.UpdateComputerDescription':
self.logger.debug('Processing AD automation event: UpdateComputerDescription')
# no exception, AD automation succeeded. delete from queue.
add_to_delete(sqs_message)
except exceptions.SocaException as e:
if e.error_code == errorcodes.AD_AUTOMATION_PRESET_COMPUTER_FAILED:
self.logger.error(f'{e}')
add_to_delete(sqs_message)
elif e.error_code == errorcodes.AD_AUTOMATION_PRESET_COMPUTER_RETRY:
# do nothing. request will be retried after visibility timeout interval.
self.logger.warning(f'{e} - request will be retried in {visibility_timeout} seconds')
else:
# retry on any unhandled exception.
raise e
except Exception as e:
self.logger.exception(f'failed to process sqs message: {e}. payload: {Utils.to_json(sqs_message)}. processing will be retried in {visibility_timeout} seconds ...')
if len(delete_messages) > 0:
delete_message_result = self.context.aws().sqs().delete_message_batch(
QueueUrl=self.ad_automation_sqs_queue_url,
Entries=delete_messages
)
failed = Utils.get_value_as_list('Failed', delete_message_result, [])
if len(failed) > 0:
self.logger.error(f'Failed to delete AD automation entries. This could result in an infinite loop. Consider increasing the directoryservice.ad_automation.sqs_visibility_timeout_seconds. failed messages: {failed}')
except KeyboardInterrupt:
pass
except Exception as e:
self.logger.exception(f'ad automation failure: {e}')
finally:
# wait only if admin user is not OK and keep retrying.
# if admin user and/or credentials are ok, wait will be handled by sqs receive message long polling
if not admin_user_ok:
self._stop_event.wait(DEFAULT_WAIT_INTERVAL_SECONDS)