elasticsearch/dsl/analysis.py (222 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, ClassVar, Dict, List, Optional, Union, cast
from . import async_connections, connections
from .utils import AsyncUsingType, AttrDict, DslBase, UsingType, merge
__all__ = ["tokenizer", "analyzer", "char_filter", "token_filter", "normalizer"]
class AnalysisBase:
@classmethod
def _type_shortcut(
cls,
name_or_instance: Union[str, "AnalysisBase"],
type: Optional[str] = None,
**kwargs: Any,
) -> DslBase:
if isinstance(name_or_instance, cls):
if type or kwargs:
raise ValueError(f"{cls.__name__}() cannot accept parameters.")
return name_or_instance # type: ignore[return-value]
if not (type or kwargs):
return cls.get_dsl_class("builtin")(name_or_instance) # type: ignore[no-any-return, attr-defined]
return cls.get_dsl_class(type, "custom")( # type: ignore[no-any-return, attr-defined]
name_or_instance, type or "custom", **kwargs
)
class CustomAnalysis:
name = "custom"
def __init__(self, filter_name: str, builtin_type: str = "custom", **kwargs: Any):
self._builtin_type = builtin_type
self._name = filter_name
super().__init__(**kwargs)
def to_dict(self) -> Dict[str, Any]:
# only name to present in lists
return self._name # type: ignore[return-value]
def get_definition(self) -> Dict[str, Any]:
d = super().to_dict() # type: ignore[misc]
d = d.pop(self.name)
d["type"] = self._builtin_type
return d # type: ignore[no-any-return]
class CustomAnalysisDefinition(CustomAnalysis):
_type_name: str
_param_defs: ClassVar[Dict[str, Any]]
filter: List[Any]
char_filter: List[Any]
def get_analysis_definition(self) -> Dict[str, Any]:
out = {self._type_name: {self._name: self.get_definition()}}
t = cast("Tokenizer", getattr(self, "tokenizer", None))
if "tokenizer" in self._param_defs and hasattr(t, "get_definition"):
out["tokenizer"] = {t._name: t.get_definition()}
filters = {
f._name: f.get_definition()
for f in self.filter
if hasattr(f, "get_definition")
}
if filters:
out["filter"] = filters
# any sub filter definitions like multiplexers etc?
for f in self.filter:
if hasattr(f, "get_analysis_definition"):
d = f.get_analysis_definition()
if d:
merge(out, d, True)
char_filters = {
f._name: f.get_definition()
for f in self.char_filter
if hasattr(f, "get_definition")
}
if char_filters:
out["char_filter"] = char_filters
return out
class BuiltinAnalysis:
name = "builtin"
def __init__(self, name: str):
self._name = name
super().__init__()
def to_dict(self) -> Dict[str, Any]:
# only name to present in lists
return self._name # type: ignore[return-value]
class Analyzer(AnalysisBase, DslBase):
_type_name = "analyzer"
name = ""
class BuiltinAnalyzer(BuiltinAnalysis, Analyzer):
def get_analysis_definition(self) -> Dict[str, Any]:
return {}
class CustomAnalyzer(CustomAnalysisDefinition, Analyzer):
_param_defs = {
"filter": {"type": "token_filter", "multi": True},
"char_filter": {"type": "char_filter", "multi": True},
"tokenizer": {"type": "tokenizer"},
}
def _get_body(
self, text: str, explain: bool, attributes: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
body = {"text": text, "explain": explain}
if attributes:
body["attributes"] = attributes
definition = self.get_analysis_definition()
analyzer_def = self.get_definition()
for section in ("tokenizer", "char_filter", "filter"):
if section not in analyzer_def:
continue
sec_def = definition.get(section, {})
sec_names = analyzer_def[section]
if isinstance(sec_names, str):
body[section] = sec_def.get(sec_names, sec_names)
else:
body[section] = [
sec_def.get(sec_name, sec_name) for sec_name in sec_names
]
if self._builtin_type != "custom":
body["analyzer"] = self._builtin_type
return body
def simulate(
self,
text: str,
using: UsingType = "default",
explain: bool = False,
attributes: Optional[Dict[str, Any]] = None,
) -> AttrDict[Any]:
"""
Use the Analyze API of elasticsearch to test the outcome of this analyzer.
:arg text: Text to be analyzed
:arg using: connection alias to use, defaults to ``'default'``
:arg explain: will output all token attributes for each token. You can
filter token attributes you want to output by setting ``attributes``
option.
:arg attributes: if ``explain`` is specified, filter the token
attributes to return.
"""
es = connections.get_connection(using)
return AttrDict(
cast(
Dict[str, Any],
es.indices.analyze(body=self._get_body(text, explain, attributes)),
)
)
async def async_simulate(
self,
text: str,
using: AsyncUsingType = "default",
explain: bool = False,
attributes: Optional[Dict[str, Any]] = None,
) -> AttrDict[Any]:
"""
Use the Analyze API of elasticsearch to test the outcome of this analyzer.
:arg text: Text to be analyzed
:arg using: connection alias to use, defaults to ``'default'``
:arg explain: will output all token attributes for each token. You can
filter token attributes you want to output by setting ``attributes``
option.
:arg attributes: if ``explain`` is specified, filter the token
attributes to return.
"""
es = async_connections.get_connection(using)
return AttrDict(
cast(
Dict[str, Any],
await es.indices.analyze(
body=self._get_body(text, explain, attributes)
),
)
)
class Normalizer(AnalysisBase, DslBase):
_type_name = "normalizer"
name = ""
class BuiltinNormalizer(BuiltinAnalysis, Normalizer):
def get_analysis_definition(self) -> Dict[str, Any]:
return {}
class CustomNormalizer(CustomAnalysisDefinition, Normalizer):
_param_defs = {
"filter": {"type": "token_filter", "multi": True},
"char_filter": {"type": "char_filter", "multi": True},
}
class Tokenizer(AnalysisBase, DslBase):
_type_name = "tokenizer"
name = ""
class BuiltinTokenizer(BuiltinAnalysis, Tokenizer):
pass
class CustomTokenizer(CustomAnalysis, Tokenizer):
pass
class TokenFilter(AnalysisBase, DslBase):
_type_name = "token_filter"
name = ""
class BuiltinTokenFilter(BuiltinAnalysis, TokenFilter):
pass
class CustomTokenFilter(CustomAnalysis, TokenFilter):
pass
class MultiplexerTokenFilter(CustomTokenFilter):
name = "multiplexer"
def get_definition(self) -> Dict[str, Any]:
d = super(CustomTokenFilter, self).get_definition()
if "filters" in d:
d["filters"] = [
# comma delimited string given by user
(
fs
if isinstance(fs, str)
else
# list of strings or TokenFilter objects
", ".join(f.to_dict() if hasattr(f, "to_dict") else f for f in fs)
)
for fs in self.filters
]
return d
def get_analysis_definition(self) -> Dict[str, Any]:
if not hasattr(self, "filters"):
return {}
fs: Dict[str, Any] = {}
d = {"filter": fs}
for filters in self.filters:
if isinstance(filters, str):
continue
fs.update(
{
f._name: f.get_definition()
for f in filters
if hasattr(f, "get_definition")
}
)
return d
class ConditionalTokenFilter(CustomTokenFilter):
name = "condition"
def get_definition(self) -> Dict[str, Any]:
d = super(CustomTokenFilter, self).get_definition()
if "filter" in d:
d["filter"] = [
f.to_dict() if hasattr(f, "to_dict") else f for f in self.filter
]
return d
def get_analysis_definition(self) -> Dict[str, Any]:
if not hasattr(self, "filter"):
return {}
return {
"filter": {
f._name: f.get_definition()
for f in self.filter
if hasattr(f, "get_definition")
}
}
class CharFilter(AnalysisBase, DslBase):
_type_name = "char_filter"
name = ""
class BuiltinCharFilter(BuiltinAnalysis, CharFilter):
pass
class CustomCharFilter(CustomAnalysis, CharFilter):
pass
# shortcuts for direct use
analyzer = Analyzer._type_shortcut
tokenizer = Tokenizer._type_shortcut
token_filter = TokenFilter._type_shortcut
char_filter = CharFilter._type_shortcut
normalizer = Normalizer._type_shortcut