public List respond()

in lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java [110:201]


  public List<ByteBuffer> respond(List<ByteBuffer> buffers, Transceiver connection) throws IOException {
    Decoder in = DecoderFactory.get().binaryDecoder(new ByteBufferInputStream(buffers), null);
    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
    BinaryEncoder out = EncoderFactory.get().binaryEncoder(bbo, null);
    Exception error = null;
    RPCContext context = new RPCContext();
    List<ByteBuffer> payload = null;
    List<ByteBuffer> handshake = null;
    boolean wasConnected = connection != null && connection.isConnected();
    try {
      Protocol remote = handshake(in, out, connection);
      out.flush();
      if (remote == null) // handshake failed
        return bbo.getBufferList();
      handshake = bbo.getBufferList();

      // read request using remote protocol specification
      context.setRequestCallMeta(META_READER.read(null, in));
      String messageName = in.readString(null).toString();
      if (messageName.equals("")) // a handshake ping
        return handshake;
      Message rm = remote.getMessages().get(messageName);
      if (rm == null)
        throw new AvroRuntimeException("No such remote message: " + messageName);
      Message m = getLocal().getMessages().get(messageName);
      if (m == null)
        throw new AvroRuntimeException("No message named " + messageName + " in " + getLocal());

      Object request = readRequest(rm.getRequest(), m.getRequest(), in);

      context.setMessage(rm);
      for (RPCPlugin plugin : rpcMetaPlugins) {
        plugin.serverReceiveRequest(context);
      }

      // create response using local protocol specification
      if ((m.isOneWay() != rm.isOneWay()) && wasConnected)
        throw new AvroRuntimeException("Not both one-way: " + messageName);

      Object response = null;

      try {
        REMOTE.set(remote);
        response = respond(m, request);
        context.setResponse(response);
      } catch (Exception e) {
        error = e;
        context.setError(error);
        LOG.warn("user error", e);
      } finally {
        REMOTE.set(null);
      }

      if (m.isOneWay() && wasConnected) // no response data
        return null;

      out.writeBoolean(error != null);
      if (error == null)
        writeResponse(m.getResponse(), response, out);
      else
        try {
          writeError(m.getErrors(), error, out);
        } catch (UnresolvedUnionException e) { // unexpected error
          throw error;
        }
    } catch (Exception e) { // system error
      LOG.warn("system error", e);
      context.setError(e);
      bbo = new ByteBufferOutputStream();
      out = EncoderFactory.get().binaryEncoder(bbo, null);
      out.writeBoolean(true);
      writeError(Protocol.SYSTEM_ERRORS, new Utf8(e.toString()), out);
      if (null == handshake) {
        handshake = new ByteBufferOutputStream().getBufferList();
      }
    }
    out.flush();
    payload = bbo.getBufferList();

    // Grab meta-data from plugins
    context.setResponsePayload(payload);
    for (RPCPlugin plugin : rpcMetaPlugins) {
      plugin.serverSendResponse(context);
    }
    META_WRITER.write(context.responseCallMeta(), out);
    out.flush();
    // Prepend handshake and append payload
    bbo.prepend(handshake);
    bbo.append(payload);

    return bbo.getBufferList();
  }