in src/es_pii_tool/helpers/steps.py [0:0]
def confirm_ilm_phase(task: 'Task', stepname, var: DotMap, **kwargs) -> None:
"""
Confirm the mounted index is in the expected ILM phase
This is done by using move_to_step. If it's already in the step, no problem.
If it's in step ``new``, this will advance the index to the expected step.
"""
missing_data(stepname, kwargs)
step = Step(task=task, stepname=stepname)
if step.finished():
logger.info('%s: already completed', step.stub)
return
step.begin()
if task.job.dry_run:
msg = f'Dry-Run: {var.mount_name} moved to ILM phase {var.phase}'
logger.debug(msg)
step.add_log(msg)
step.end(completed=True, errors=False, logmsg=f'{stepname} completed')
return
# Wait for phase to be "new"
pause, timeout = timing('ilm')
logger.debug(f'ENV pause = {pause}, timeout = {timeout}')
try:
# Update in es_wait 0.9.2:
# - If you send phase='new', it will wait for the phase to be 'new' or higher
# - This is where a user was getting stuck. They were waiting for 'new' but
# - the phase was already 'frozen', so it was endlessly checking for 'new'.
es_waiter(
var.client,
IlmPhase,
name=var.mount_name,
phase='new',
pause=pause,
timeout=timeout,
)
# Wait for step to be "complete"
es_waiter(
var.client, IlmStep, name=var.mount_name, pause=pause, timeout=timeout
)
except BadClientResult as bad:
_ = f'ILM step confirmation problem -- ERROR: {bad}'
logger.error(_)
step.add_log(_)
failed_step(task, step, bad)
def get_currstep():
try:
_ = api.generic_get(var.client.ilm.explain_lifecycle, index=var.mount_name)
except MissingError as exc:
_ = f'Unable to get ILM phase of {var.mount_name}'
logger.error(_)
step.add_log(_)
failed_step(task, step, exc)
try:
expl = _['indices'][var.mount_name]
except KeyError as err:
msg = f'{var.mount_name} not found in ILM explain data: {err}'
logger.error(msg)
step.add_log(msg)
failed_step(task, step, err)
if 'managed' not in expl:
msg = f'Index {var.mount_name} is not managed by ILM'
step.add_log(msg)
failed_step(
task, step, ValueMismatch(msg, expl['managed'], '{"managed": True}')
)
return {'phase': expl['phase'], 'action': expl['action'], 'name': expl['step']}
nextstep = {'phase': var.phase, 'action': 'complete', 'name': 'complete'}
if task.job.dry_run: # Don't actually move_to_step if dry_run
msg = (
f'{stepname}: Dry-Run: {var.mount_name} not moved/confirmed to ILM '
f'phase {var.phase}'
)
logger.debug(msg)
step.end(completed=True, errors=False, logmsg=f'{stepname} completed')
return
# We will try to move the index to the expected phase up to 3 times
# before failing the step.
attempts = 0
success = False
while attempts < 3 and not success:
# Since we are now testing for 'new' or higher, we may not need to advance
# ILM phases. If the current step is already where we expect to be, log
# confirmation and move on.
logger.debug('Attempt number: %s', attempts)
currstep = get_currstep()
if currstep == nextstep:
msg = (
f'{stepname}: {var.mount_name} is confirmed to be in ILM phase '
f'{var.phase}'
)
logger.debug(msg)
step.add_log(msg)
# Set both while loop critera to values that will end the loop
success = True
attempts = 3
else:
# If we are not yet in the expected target phase, then proceed with the
# ILM phase change.
logger.debug('Current ILM Phase: %s', currstep)
logger.debug('Target ILM Phase: %s', nextstep)
logger.debug('PHASE: %s', var.phase)
try:
api.ilm_move(var.client, var.mount_name, currstep, nextstep)
success = True
except BadClientResult as bad:
logger.debug('Attempt failed. Incrementing attempts.')
attempts += 1
if attempts == 3:
_ = 'Attempt limit reached. Failing step.'
logger.error(_)
step.add_log(_)
failed_step(task, step, bad)
logger.debug('Waiting %s seconds before retrying...', PAUSE_DEFAULT)
time.sleep(float(PAUSE_DEFAULT))
logger.warning('ILM move failed: %s -- Retrying...', bad.message)
continue
pause, timeout = timing('ilm')
logger.debug(f'ENV pause = {pause}, timeout = {timeout}')
try:
es_waiter(
var.client,
IlmPhase,
name=var.mount_name,
phase=var.phase,
pause=pause,
timeout=timeout,
)
es_waiter(
var.client,
IlmStep,
name=var.mount_name,
pause=pause,
timeout=timeout,
)
except BadClientResult as phase_err:
msg = f'Unable to wait for ILM step to complete -- ERROR: {phase_err}'
logger.error(msg)
step.add_log(msg)
failed_step(task, step, phase_err)
# If we make it here, we have successfully moved the index to the expected phase
step.end(completed=True, errors=False, logmsg=f'{stepname} completed')