{{!

  Copyright (c) Facebook, Inc. and its affiliates.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.

}}
{{> AutoGenerated}}

package {{service:javaPackage}};

import static com.facebook.swift.service.SwiftConstants.STICKY_HASH_KEY;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.protocol.*;
import org.apache.thrift.ClientPushMetadata;
import org.apache.thrift.InteractionCreate;
import org.apache.thrift.InteractionTerminate;
import com.facebook.thrift.client.ResponseWrapper;
import com.facebook.thrift.client.RpcOptions;
import com.facebook.thrift.util.Readers;

public class {{service:javaCapitalName}}ReactiveClient {{#service:extends}}extends {{service:javaPackage}}.{{service:javaCapitalName}}ReactiveClient{{/service:extends}}
  implements {{service:javaCapitalName}}.Reactive {
  private static final AtomicLong _interactionCounter = new AtomicLong(0);
  {{^service:extends}}

  protected final org.apache.thrift.ProtocolId _protocolId;
  protected final reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient;
  protected final Map<String, String> _headers;
  protected final Map<String, String> _persistentHeaders;
  protected final Set<Long> _activeInteractions;
  {{/service:extends}}

  {{#service:singleRequestFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:exceptions}}
  {{/service:singleRequestFunctions}}
  {{#service:streamingFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS = {{#function:stream_exceptions?}}new HashMap<>();{{/function:stream_exceptions?}}{{^function:stream_exceptions?}}java.util.Collections.emptyMap();{{/function:stream_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:exceptions}}
  {{#function:stream_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:stream_exceptions}}
  {{/service:streamingFunctions}}
  {{#service:sinkFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  {{#function:return_type}}{{#type:sink_elem_type}}
  private static final TField _{{function:javaName}}_SINK_TFIELD = new TField("payload", TType.{{> TType}}, (short)0);
  {{/type:sink_elem_type}}{{/function:return_type}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS = {{#function:sink_final_response_exceptions?}}new HashMap<>();{{/function:sink_final_response_exceptions?}}{{^function:sink_final_response_exceptions?}}java.util.Collections.emptyMap();{{/function:sink_final_response_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:exceptions}}
  {{#function:sink_final_response_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:sink_final_response_exceptions}}
  {{/service:sinkFunctions}}
  {{#service:interactions}}
  {{#service:singleRequestFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}}_INT = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS_INT = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:exceptions}}
  {{/service:singleRequestFunctions}}
  {{#service:streamingFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}}_INT = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS_INT = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS_INT = {{#function:stream_exceptions?}}new HashMap<>();{{/function:stream_exceptions?}}{{^function:stream_exceptions?}}java.util.Collections.emptyMap();{{/function:stream_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:exceptions}}
  {{#function:stream_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER_INT{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}
  {{/function:stream_exceptions}}
  {{/service:streamingFunctions}}
  {{/service:interactions}}

  static {
    {{#service:singleRequestFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{/service:singleRequestFunctions}}
    {{#service:streamingFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{#function:stream_exceptions}}
    _{{function:javaName}}_STREAM_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}});
    {{/function:stream_exceptions}}{{/service:streamingFunctions}}
    {{#service:sinkFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{#function:sink_final_response_exceptions}}
    _{{function:javaName}}_STREAM_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}});
    {{/function:sink_final_response_exceptions}}{{/service:sinkFunctions}}
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient) {
    {{#service:extends}}super(_protocolId, _rpcClient);{{/service:extends}}
    {{^service:extends}}
    this._protocolId = _protocolId;
    this._rpcClient = _rpcClient;
    this._headers = java.util.Collections.emptyMap();
    this._persistentHeaders = java.util.Collections.emptyMap();
    this._activeInteractions = ConcurrentHashMap.newKeySet();
    {{/service:extends}}
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient, Map<String, String> _headers, Map<String, String> _persistentHeaders) {
    this(_protocolId, _rpcClient, _headers, _persistentHeaders, new AtomicLong(), ConcurrentHashMap.newKeySet());
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient, Map<String, String> _headers, Map<String, String> _persistentHeaders, AtomicLong interactionCounter, Set<Long> activeInteractions) {
    {{#service:extends}}super(_protocolId, _rpcClient, _headers, _persistentHeaders, interactionCounter, activeInteractions);{{/service:extends}}
    {{^service:extends}}
    this._protocolId = _protocolId;
    this._rpcClient = _rpcClient;
    this._headers = _headers;
    this._persistentHeaders = _persistentHeaders;
    this._activeInteractions = activeInteractions;
    {{/service:extends}}
  }

  @java.lang.Override
  public void dispose() {}

  {{#service:singleRequestFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER = {{!
    }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
    }}{{^function:voidType}}{{#function:return_type}}{{> TypeReader }}{{/function:return_type}}{{/function:voidType}};

  @java.lang.Override
  public reactor.core.publisher.Mono<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMap(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:name}}")
                .setKind(org.apache.thrift.RpcKind.{{#function:oneway?}}SINGLE_REQUEST_NO_RESPONSE{{/function:oneway?}}{{^function:oneway?}}SINGLE_REQUEST_SINGLE_RESPONSE{{/function:oneway?}})
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            return _rpc
                {{#function:oneway?}}.singleRequestNoResponse(_crp, rpcOptions).thenReturn(ResponseWrapper.create(null, java.util.Collections.emptyMap(), java.util.Collections.emptyMap())){{/function:oneway?}}{{!
                }}{{^function:oneway?}}.singleRequestSingleResponse(_crp, rpcOptions).doOnNext(_p -> {if(_p.getException() != null) throw com.facebook.thrift.util.ExceptionUtil.propagate(_p);}){{/function:oneway?}};
      });
  }

  @java.lang.Override
  public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
  }

  @java.lang.Override
  public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:singleRequestFunctions}}
  {{#service:streamingFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER = {{!
    }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
    }}{{^function:voidType}}{{#function:return_type}}{{#type:stream_elem_type}}{{> TypeReader }}{{/type:stream_elem_type}}{{/function:return_type}}{{/function:voidType}};

  {{#function:return_type}}{{#type:stream_has_first_response?}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER = {{!
    }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
    }}{{^function:voidType}}{{#function:return_type}}{{#type:stream_first_response_type}}{{> TypeReader }}{{/type:stream_first_response_type}}{{/function:return_type}}{{/function:voidType}};
  {{/type:stream_has_first_response?}}{{/function:return_type}}

  @java.lang.Override
  public reactor.core.publisher.Flux<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMapMany(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:name}}")
                .setKind(org.apache.thrift.RpcKind.SINGLE_REQUEST_STREAMING_RESPONSE)
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:stream_elem_type}}{{> BoxedType}}{{/type:stream_elem_type}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:stream_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:stream_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            return _rpc
                .singleRequestStreamingResponse(_crp, rpcOptions)
                .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{#function:return_type}}{{^type:stream_has_first_response?}}
                .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:stream_has_first_response?}}{{/function:return_type}}
                .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> StreamResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:stream_has_first_response?}}.getData(){{/type:stream_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
      });
  }

  @java.lang.Override
  public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions).map(_p -> _p.getData());
  }

  @java.lang.Override
  public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:streamingFunctions}}

  {{#service:sinkFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private com.facebook.thrift.payload.Writer _create{{function:javaName}}SinkWriter({{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}} _p{{/type:sink_elem_type}}{{/function:return_type}}) {
    return oprot -> {
      try {
        oprot.writeFieldBegin(_{{function:javaName}}_SINK_TFIELD);
        {{#function:return_type}}{{#type:sink_elem_type}}
        {{> WriteSinkPayloadType}}
        {{/type:sink_elem_type}}{{/function:return_type}}

        oprot.writeFieldEnd();

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER = {{!
    }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
    }}{{^function:voidType}}{{#function:return_type}}{{#type:sink_final_response_type}}{{> TypeReader }}{{/type:sink_final_response_type}}{{/function:return_type}}{{/function:voidType}};

  {{#function:return_type}}{{#type:sink_has_first_response?}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER = {{!
    }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
    }}{{^function:voidType}}{{#function:return_type}}{{#type:sink_first_response_type}}{{> TypeReader }}{{/type:sink_first_response_type}}{{/function:return_type}}{{/function:voidType}};
  {{/type:sink_has_first_response?}}{{/function:return_type}}

  @java.lang.Override
  public {{#function:return_type}}{{> SinkWrapperReturnType}}{{/function:return_type}} {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads, com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMapMany(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:name}}")
                .setKind(org.apache.thrift.RpcKind.SINK)
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:sink_final_response_type}}{{> BoxedType}}{{/type:sink_final_response_type}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:sink_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:sink_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            reactor.core.publisher.Flux<com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:sink_final_response_type}}{{> BoxedType}}{{/type:sink_final_response_type}}{{/function:return_type}}>> _sink =
              reactor.core.publisher.Mono.just(_crp).concatWith(reactor.core.publisher.Flux.from(payloads)
                .map(_p -> com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}SinkWriter(_p),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:sink_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:sink_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap())));

            return _rpc
                .streamingRequestStreamingResponse(_sink, rpcOptions)
                .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());})
                .limitRequest(2){{#function:return_type}}{{^type:sink_has_first_response?}}
                .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:sink_has_first_response?}}{{/function:return_type}}
                .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> SinkResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:sink_has_first_response?}}.getData(){{/type:sink_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
      }){{#function:return_type}}{{^type:sink_has_first_response?}}.single(){{/type:sink_has_first_response?}}{{/function:return_type}};
  }

  @java.lang.Override
  public {{#function:return_type}}{{> SinkReturnType}}{{/function:return_type}} {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads, com.facebook.thrift.client.RpcOptions rpcOptions) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} payloads, rpcOptions).map(_p -> _p.getData());
  }

  @java.lang.Override
  public {{#function:return_type}}{{> SinkReturnType}}{{/function:return_type}} {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads) {
      return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} payloads, com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:sinkFunctions}}
  {{#service:interactions}}
  public class {{service:name}}Impl implements {{service:name}} {
    private final long interactionId;

    {{service:name}}Impl(long interactionId) {
      this.interactionId = interactionId;
    }

    {{#service:singleRequestFunctions}}
    private final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
    {{#function:exceptions}}
    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} = Readers.wrap({{#field:type}}{{> BoxedType}}.asReader());{{/field:type}}

    {{/function:exceptions}}
    {{/service:singleRequestFunctions}}

    {{#service:singleRequestFunctions}}
    private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return oprot -> {
        try {
          {{#function:args}}
          {{#field:type}}
          {
            oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}}_INT);

            {{> FieldType}} _iter0 = {{field:javaName}};

            {{#field:type}}
            {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
            {{/field:type}}
            oprot.writeFieldEnd();
          }
          {{/field:type}}

          {{/function:args}}

        } catch (Throwable _e) {
          throw reactor.core.Exceptions.propagate(_e);
        }
      };
    }

    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER = {{!
      }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
      }}{{^function:voidType}}{{#function:return_type}}{{> TypeReader }}{{/function:return_type}}{{/function:voidType}};

    public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}
    final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}},{{/last?}}{{/function:args}}) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
    }

    @java.lang.Override
    public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}}RpcOptions rpcOptions)  {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
    }

    @java.lang.Override
    public reactor.core.publisher.Mono<ResponseWrapper<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}}RpcOptions rpcOptions)  {
      return _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMap(_rpc -> {
          String interactionName = "{{service:name}}.{{function:name}}";
          org.apache.thrift.RequestRpcMetadata.Builder _metadataBuilder = new org.apache.thrift.RequestRpcMetadata.Builder()
                  .setName(interactionName)
                  .setKind(org.apache.thrift.RpcKind.{{#function:oneway?}}SINGLE_REQUEST_NO_RESPONSE{{/function:oneway?}}{{^function:oneway?}}SINGLE_REQUEST_SINGLE_RESPONSE{{/function:oneway?}})
                  .setOtherMetadata(getHeaders(rpcOptions))
                  .setProtocol(_protocolId);

          if (_activeInteractions.contains(interactionId)) {
            _metadataBuilder.setInteractionId(interactionId);
          } else {
            _metadataBuilder.setInteractionCreate(
              new InteractionCreate.Builder()
                  .setInteractionId(interactionId)
                  .setInteractionName("{{service:name}}")
                  .build());
            _metadataBuilder.setInteractionId(0L);
            _activeInteractions.add(interactionId);
          }

          org.apache.thrift.RequestRpcMetadata _metadata = _metadataBuilder.build();

          com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> _crp =
              com.facebook.thrift.payload.ClientRequestPayload.create(
                  _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                  _{{function:javaName}}_READER,
                  _{{function:javaName}}_EXCEPTION_READERS_INT,
                  _metadata,
                  java.util.Collections.emptyMap());

          return _rpc
              {{#function:oneway?}}.singleRequestNoResponse(_crp, rpcOptions).thenReturn(ResponseWrapper.create(null, java.util.Collections.emptyMap(), java.util.Collections.emptyMap())){{/function:oneway?}}{{!
              }}{{^function:oneway?}}.singleRequestSingleResponse(_crp, rpcOptions).doOnNext(_p -> {if(_p.getException() != null) throw com.facebook.thrift.util.ExceptionUtil.propagate(_p);}){{/function:oneway?}};
        });
    }

    {{/service:singleRequestFunctions}}
    {{#service:streamingFunctions}}
    private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return oprot -> {
        try {
          {{#function:args}}
          {{#field:type}}
          {
            oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}}_INT);

            {{> FieldType}} _iter0 = {{field:javaName}};

            {{#field:type}}
            {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
            {{/field:type}}
            oprot.writeFieldEnd();
          }
          {{/field:type}}

          {{/function:args}}

        } catch (Throwable _e) {
          throw reactor.core.Exceptions.propagate(_e);
        }
      };
    }

    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER = {{!
      }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
      }}{{^function:voidType}}{{#function:return_type}}{{#type:stream_elem_type}}{{> TypeReader }}{{/type:stream_elem_type}}{{/function:return_type}}{{/function:voidType}};

    {{#function:return_type}}{{#type:stream_has_first_response?}}
    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER = {{!
      }}{{#function:voidType}}Readers.voidReader(){{/function:voidType}}{{!
      }}{{^function:voidType}}{{#function:return_type}}{{#type:stream_first_response_type}}{{> TypeReader }}{{/type:stream_first_response_type}}{{/function:return_type}}{{/function:voidType}};
    {{/type:stream_has_first_response?}}{{/function:return_type}}

    @java.lang.Override
    public reactor.core.publisher.Flux<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
      return _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMapMany(_rpc -> {
          String interactionName = "{{service:name}}.{{function:name}}";
          org.apache.thrift.RequestRpcMetadata.Builder _metadataBuilder = new org.apache.thrift.RequestRpcMetadata.Builder()
                  .setName(interactionName)
                  .setKind(org.apache.thrift.RpcKind.SINGLE_REQUEST_STREAMING_RESPONSE)
                  .setOtherMetadata(getHeaders(rpcOptions))
                  .setProtocol(_protocolId);

          if (_activeInteractions.contains(interactionId)) {
            _metadataBuilder.setInteractionId(interactionId);
          } else {
            _metadataBuilder.setInteractionCreate(
              new InteractionCreate.Builder()
                  .setInteractionId(interactionId)
                  .setInteractionName("{{service:name}}")
                  .build());
            _metadataBuilder.setInteractionId(0L);
            _activeInteractions.add(interactionId);
          }


          com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:stream_elem_type}}{{> BoxedType}}{{/type:stream_elem_type}}{{/function:return_type}}> _crp =
              com.facebook.thrift.payload.ClientRequestPayload.create(
                  _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                  _{{function:javaName}}_READER,{{#function:return_type}}{{#type:stream_has_first_response?}}
                  _{{function:javaName}}_FIRST_READER,{{/type:stream_has_first_response?}}{{/function:return_type}}
                  _{{function:javaName}}_EXCEPTION_READERS_INT,
                  _{{function:javaName}}_STREAM_EXCEPTION_READERS_INT,
                  _metadataBuilder.build(),
                  java.util.Collections.emptyMap());

          return _rpc
              .singleRequestStreamingResponse(_crp, rpcOptions)
              .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{#function:return_type}}{{^type:stream_has_first_response?}}
              .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:stream_has_first_response?}}{{/function:return_type}}
              .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> StreamResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:stream_has_first_response?}}.getData(){{/type:stream_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
        });
    }

    @java.lang.Override
    public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
        return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions).map(_p -> _p.getData());
    }

    @java.lang.Override
    public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
    }

    {{/service:streamingFunctions}}
    @java.lang.Override
    public void dispose() {
      _activeInteractions.remove(interactionId);
      _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMap(_rpc -> {
          InteractionTerminate term = new InteractionTerminate.Builder().setInteractionId(interactionId).build();
          ClientPushMetadata metadata = ClientPushMetadata.fromInteractionTerminate(term);
          return _rpc.metadataPush(metadata, com.facebook.thrift.client.RpcOptions.EMPTY);
        }).subscribe();
    }
  }

  public {{service:name}} create{{service:name}}() {
      return new {{service:name}}Impl(_interactionCounter.incrementAndGet());
  }
  {{^last?}}

  {{/last?}}
  {{/service:interactions}}

  private Map<String, String> getHeaders(com.facebook.thrift.client.RpcOptions rpcOptions) {
      Map<String, String> headers = new HashMap<>();
      if (rpcOptions.getRequestHeaders() != null && !rpcOptions.getRequestHeaders().isEmpty()) {
          headers.putAll(rpcOptions.getRequestHeaders());
      }
      if (_headers != null && !_headers.isEmpty()) {
          headers.putAll(_headers);
      }
      if (_persistentHeaders != null && !_persistentHeaders.isEmpty()) {
          headers.putAll(_persistentHeaders);
      }
      return headers;
  }
}
