#  Licensed to Elasticsearch B.V. under one or more contributor
#  license agreements. See the NOTICE file distributed with
#  this work for additional information regarding copyright
#  ownership. Elasticsearch B.V. licenses this file to you under
#  the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.

import typing as t

from elastic_transport import ObjectApiResponse

from ._base import NamespacedClient
from .utils import SKIP_IN_PATH, _quote, _rewrite_parameters


class LogstashClient(NamespacedClient):

    @_rewrite_parameters()
    async def delete_pipeline(
        self,
        *,
        id: str,
        error_trace: t.Optional[bool] = None,
        filter_path: t.Optional[t.Union[str, t.Sequence[str]]] = None,
        human: t.Optional[bool] = None,
        pretty: t.Optional[bool] = None,
    ) -> ObjectApiResponse[t.Any]:
        """
        .. raw:: html

          <p>Delete a Logstash pipeline.
          Delete a pipeline that is used for Logstash Central Management.
          If the request succeeds, you receive an empty response with an appropriate status code.</p>


        `<https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-logstash-delete-pipeline>`_

        :param id: An identifier for the pipeline.
        """
        if id in SKIP_IN_PATH:
            raise ValueError("Empty value passed for parameter 'id'")
        __path_parts: t.Dict[str, str] = {"id": _quote(id)}
        __path = f'/_logstash/pipeline/{__path_parts["id"]}'
        __query: t.Dict[str, t.Any] = {}
        if error_trace is not None:
            __query["error_trace"] = error_trace
        if filter_path is not None:
            __query["filter_path"] = filter_path
        if human is not None:
            __query["human"] = human
        if pretty is not None:
            __query["pretty"] = pretty
        __headers = {"accept": "application/json"}
        return await self.perform_request(  # type: ignore[return-value]
            "DELETE",
            __path,
            params=__query,
            headers=__headers,
            endpoint_id="logstash.delete_pipeline",
            path_parts=__path_parts,
        )

    @_rewrite_parameters()
    async def get_pipeline(
        self,
        *,
        id: t.Optional[t.Union[str, t.Sequence[str]]] = None,
        error_trace: t.Optional[bool] = None,
        filter_path: t.Optional[t.Union[str, t.Sequence[str]]] = None,
        human: t.Optional[bool] = None,
        pretty: t.Optional[bool] = None,
    ) -> ObjectApiResponse[t.Any]:
        """
        .. raw:: html

          <p>Get Logstash pipelines.
          Get pipelines that are used for Logstash Central Management.</p>


        `<https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-logstash-get-pipeline>`_

        :param id: A comma-separated list of pipeline identifiers.
        """
        __path_parts: t.Dict[str, str]
        if id not in SKIP_IN_PATH:
            __path_parts = {"id": _quote(id)}
            __path = f'/_logstash/pipeline/{__path_parts["id"]}'
        else:
            __path_parts = {}
            __path = "/_logstash/pipeline"
        __query: t.Dict[str, t.Any] = {}
        if error_trace is not None:
            __query["error_trace"] = error_trace
        if filter_path is not None:
            __query["filter_path"] = filter_path
        if human is not None:
            __query["human"] = human
        if pretty is not None:
            __query["pretty"] = pretty
        __headers = {"accept": "application/json"}
        return await self.perform_request(  # type: ignore[return-value]
            "GET",
            __path,
            params=__query,
            headers=__headers,
            endpoint_id="logstash.get_pipeline",
            path_parts=__path_parts,
        )

    @_rewrite_parameters(
        body_name="pipeline",
    )
    async def put_pipeline(
        self,
        *,
        id: str,
        pipeline: t.Optional[t.Mapping[str, t.Any]] = None,
        body: t.Optional[t.Mapping[str, t.Any]] = None,
        error_trace: t.Optional[bool] = None,
        filter_path: t.Optional[t.Union[str, t.Sequence[str]]] = None,
        human: t.Optional[bool] = None,
        pretty: t.Optional[bool] = None,
    ) -> ObjectApiResponse[t.Any]:
        """
        .. raw:: html

          <p>Create or update a Logstash pipeline.</p>
          <p>Create a pipeline that is used for Logstash Central Management.
          If the specified pipeline exists, it is replaced.</p>


        `<https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-logstash-put-pipeline>`_

        :param id: An identifier for the pipeline.
        :param pipeline:
        """
        if id in SKIP_IN_PATH:
            raise ValueError("Empty value passed for parameter 'id'")
        if pipeline is None and body is None:
            raise ValueError(
                "Empty value passed for parameters 'pipeline' and 'body', one of them should be set."
            )
        elif pipeline is not None and body is not None:
            raise ValueError("Cannot set both 'pipeline' and 'body'")
        __path_parts: t.Dict[str, str] = {"id": _quote(id)}
        __path = f'/_logstash/pipeline/{__path_parts["id"]}'
        __query: t.Dict[str, t.Any] = {}
        if error_trace is not None:
            __query["error_trace"] = error_trace
        if filter_path is not None:
            __query["filter_path"] = filter_path
        if human is not None:
            __query["human"] = human
        if pretty is not None:
            __query["pretty"] = pretty
        __body = pipeline if pipeline is not None else body
        __headers = {"accept": "application/json", "content-type": "application/json"}
        return await self.perform_request(  # type: ignore[return-value]
            "PUT",
            __path,
            params=__query,
            headers=__headers,
            body=__body,
            endpoint_id="logstash.put_pipeline",
            path_parts=__path_parts,
        )
