in source/lambda/es_loader/index.py [0:0]
def bulkloads_into_opensearch(es_entries, collected_metrics):
output_size, total_output_size = 0, 0
total_count, success_count, error_count, es_response_time = 0, 0, 0, 0
results = False
putdata_list = []
error_reason_list = []
filter_path = ['took', 'errors', 'items.index.status',
'items.index.error.reason', 'items.index.error.type']
for data in es_entries:
putdata_list.append(data)
output_size += len(str(data))
# es の http.max_content_length は t2 で10MB なのでデータがたまったらESにロード
if isinstance(data, str) and output_size > 6000000:
total_output_size += output_size
results = es_conn.bulk(putdata_list, filter_path=filter_path)
es_took, success, error, error_reasons = check_es_results(
results, total_count)
success_count += success
error_count += error
es_response_time += es_took
output_size = 0
total_count += len(putdata_list)
putdata_list = []
if len(error_reasons):
error_reason_list.extend([error_reasons])
if output_size > 0:
total_output_size += output_size
results = es_conn.bulk(putdata_list, filter_path=filter_path)
# logger.debug(results)
es_took, success, error, error_reasons = check_es_results(
results, total_count)
success_count += success
error_count += error
es_response_time += es_took
total_count += len(putdata_list)
if len(error_reasons):
error_reason_list.extend([error_reasons])
collected_metrics['total_output_size'] = total_output_size
collected_metrics['total_log_load_count'] = total_count
collected_metrics['success_count'] = success_count
collected_metrics['error_count'] = error_count
collected_metrics['es_response_time'] = es_response_time
return collected_metrics, error_reason_list