in curator/actions/shrink.py [0:0]
def do_action(self):
"""
:py:meth:`~.elasticsearch.client.IndicesClient.shrink` the indices in
:py:attr:`index_list`
"""
self.index_list.filter_closed()
self.index_list.filter_by_shards(number_of_shards=self.number_of_shards)
self.index_list.empty_list_check()
msg = (
f'Shrinking {len(self.index_list.indices)} selected indices: '
f'{self.index_list.indices}'
)
self.loggit.info(msg)
try:
index_lists = chunk_index_list(self.index_list.indices)
for lst in index_lists:
for idx in lst: # Shrink can only be done one at a time...
target = self._shrink_target(idx)
self.loggit.info(
'Source index: %s -- Target index: %s', idx, target
)
# Pre-check ensures disk space available for each pass of the loop
self.pre_shrink_check(idx)
# Route the index to the shrink node
self.loggit.info(
'Moving shards to shrink node: "%s"', self.shrink_node_name
)
self.route_index(idx, 'require', '_name', self.shrink_node_name)
# Ensure a copy of each shard is present
self._check_all_shards(idx)
# Block writes on index
self._block_writes(idx)
# Do final health check
if not health_check(self.client, status='green'):
msg = (
'Unable to proceed with shrink action. '
'Cluster health is not "green"'
)
raise ActionError(msg)
# Do the shrink
msg = (
f'Shrinking index "{idx}" to "{target}" with settings: '
f'{self.settings}, wait_for_active_shards='
f'{self.wait_for_active_shards}'
)
self.loggit.info(msg)
try:
self.client.indices.shrink(
index=idx,
target=target,
settings=self.settings,
wait_for_active_shards=self.wait_for_active_shards,
)
# Wait for it to complete
if self.wfc:
self.loggit.debug(
'Wait for shards to complete allocation for index: %s',
target,
)
if self.wait_for_rebalance:
wait_for_it(
self.client,
'shrink',
wait_interval=self.wait_interval,
max_wait=self.max_wait,
)
else:
wait_for_it(
self.client,
'relocate',
index=target,
wait_interval=self.wait_interval,
max_wait=self.max_wait,
)
except Exception as exc:
if self.client.indices.exists(index=target):
msg = (
f'Deleting target index "{target}" due to failure '
f'to complete shrink'
)
self.loggit.error(msg)
self.client.indices.delete(index=target)
raise ActionError(
f'Unable to shrink index "{idx}" -- Error: {exc}'
) from exc
self.loggit.info(
'Index "%s" successfully shrunk to "%s"', idx, target
)
# Do post-shrink steps
# Unblock writes on index (just in case)
self._unblock_writes(idx)
# Post-allocation, if enabled
if self.post_allocation:
submsg = (
f"index.routing.allocation."
f"{self.post_allocation['allocation_type']}."
f"{self.post_allocation['key']}:"
f"{self.post_allocation['value']}"
)
msg = (
f'Applying post-shrink allocation rule "{submsg}" '
f'to index "{target}"'
)
self.loggit.info(msg)
self.route_index(
target,
self.post_allocation['allocation_type'],
self.post_allocation['key'],
self.post_allocation['value'],
)
# Copy aliases, if flagged
if self.copy_aliases:
self.loggit.info('Copy source index aliases "%s"', idx)
self.do_copy_aliases(idx, target)
# Delete, if flagged
if self.delete_after:
self.loggit.info('Deleting source index "%s"', idx)
self.client.indices.delete(index=idx)
else: # Let's unset the routing we applied here.
self.loggit.info(
'Unassigning routing for source index: "%s"', idx
)
self.route_index(idx, 'require', '_name', '')
except Exception as err:
# Just in case it fails after attempting to meet this condition
self._unblock_writes(idx)
report_failure(err)