in eland/field_mappings.py [0:0]
def _create_capability_matrix(all_fields, source_fields, all_fields_caps):
"""
{
"fields": {
"rating": {
"long": {
"searchable": true,
"aggregatable": false,
"indices": ["index1", "index2"],
"non_aggregatable_indices": ["index1"]
},
"keyword": {
"searchable": false,
"aggregatable": true,
"indices": ["index3", "index4"],
"non_searchable_indices": ["index4"]
}
},
"title": {
"text": {
"searchable": true,
"aggregatable": false
}
}
}
}
"""
all_fields_caps_fields = all_fields_caps["fields"]
capability_matrix = {}
for field, field_caps in all_fields_caps_fields.items():
if field in all_fields:
# v = {'long': {'type': 'long', 'searchable': True, 'aggregatable': True}}
for kk, vv in field_caps.items():
_source = field in source_fields
es_field_name = field
es_dtype = vv["type"]
es_date_format = all_fields[field][1]
pd_dtype = FieldMappings._es_dtype_to_pd_dtype(vv["type"])
is_searchable = vv["searchable"]
is_aggregatable = vv["aggregatable"]
scripted = False
aggregatable_es_field_name = None # this is populated later
caps = [
es_field_name,
_source,
es_dtype,
es_date_format,
pd_dtype,
is_searchable,
is_aggregatable,
scripted,
aggregatable_es_field_name,
]
capability_matrix[field] = caps
if "non_aggregatable_indices" in vv:
warnings.warn(
f"Field {field} has conflicting aggregatable fields across indexes "
f"{str(vv['non_aggregatable_indices'])}",
UserWarning,
)
if "non_searchable_indices" in vv:
warnings.warn(
f"Field {field} has conflicting searchable fields across indexes "
f"{str(vv['non_searchable_indices'])}",
UserWarning,
)
capability_matrix_df = pd.DataFrame.from_dict(
capability_matrix, orient="index", columns=FieldMappings.column_labels
)
def find_aggregatable(row, df):
# convert series to dict so we can add 'aggregatable_es_field_name'
row_as_dict = row.to_dict()
if not row_as_dict["is_aggregatable"]:
# if not aggregatable, then try field.keyword
es_field_name_keyword = row.es_field_name + ".keyword"
try:
series = df.loc[df.es_field_name == es_field_name_keyword]
if not series.empty and series.is_aggregatable.squeeze():
row_as_dict["aggregatable_es_field_name"] = (
es_field_name_keyword
)
else:
row_as_dict["aggregatable_es_field_name"] = None
except KeyError:
row_as_dict["aggregatable_es_field_name"] = None
else:
row_as_dict["aggregatable_es_field_name"] = row_as_dict["es_field_name"]
return pd.Series(data=row_as_dict)
# add aggregatable_es_field_name column by applying action to each row
capability_matrix_df = capability_matrix_df.apply(
find_aggregatable, args=(capability_matrix_df,), axis="columns"
)
# return just source fields (as these are the only ones we display)
return capability_matrix_df[capability_matrix_df.is_source].sort_index()