in src/es_pii_tool/helpers/elastic_api.py [0:0]
def redact_from_index(client: 'Elasticsearch', index_name: str, config: t.Dict) -> None:
"""Redact data from an index using a painless script.
Collect the task_id and wait for the reinding job to complete before returning
:param client: A client connection object
:param index_name: The index to act on
:param config: The config block being iterated. Contains ``query``, ``message``,
and ``fields``
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type index_name: str
:type config: dict
"""
logger.debug('Begin redaction...')
logger.info('Before update by query, %s', report_segment_count(client, index_name))
logger.debug('Updating and redacting data...')
script = build_script(config['message'], config['fields'])
response = {}
try:
response = dict(
client.update_by_query(
index=index_name,
script=script,
query=config['query'],
wait_for_completion=False,
expand_wildcards=['open', 'hidden'],
)
)
except (ApiError, NotFoundError, TransportError, BadRequestError) as err:
logger.critical('update_by_query yielded an error: %s', err)
raise FatalError('update_by_query API call failed', err)
logger.debug('Checking update by query status...')
logger.debug('response = %s', response)
pause, timeout = timing('task')
logger.debug(f'ENV pause = {pause}, timeout = {timeout}')
try:
es_waiter(
client,
Task,
action='update_by_query',
task_id=response['task'],
pause=pause,
timeout=timeout,
)
except BadClientResult as exc:
logger.error('Exception: %s', exc)
raise FatalError('Failed to complete update by query', exc)
logger.info('After update by query, %s', report_segment_count(client, index_name))
logger.debug('Update by query completed.')