in lang/csharp/src/apache/ipc/Responder.cs [121:224]
public IList<MemoryStream> Respond(IList<MemoryStream> buffers,
Transceiver connection)
{
Decoder input = new BinaryDecoder(new ByteBufferInputStream(buffers));
var bbo = new ByteBufferOutputStream();
var output = new BinaryEncoder(bbo);
Exception error = null;
var context = new RpcContext();
List<MemoryStream> handshake = null;
bool wasConnected = connection != null && connection.IsConnected;
try
{
Protocol remote = Handshake(input, output, connection);
output.Flush();
if (remote == null) // handshake failed
return bbo.GetBufferList();
handshake = bbo.GetBufferList();
// read request using remote protocol specification
context.RequestCallMeta = META_READER.Read(null, input);
String messageName = input.ReadString();
if (messageName.Equals("")) // a handshake ping
return handshake;
Message rm = remote.Messages[messageName];
if (rm == null)
throw new AvroRuntimeException("No such remote message: " + messageName);
Message m = Local.Messages[messageName];
if (m == null)
throw new AvroRuntimeException("No message named " + messageName
+ " in " + Local);
Object request = ReadRequest(rm.Request, m.Request, input);
context.Message = rm;
// create response using local protocol specification
if ((m.Oneway.GetValueOrDefault() != rm.Oneway.GetValueOrDefault()) && wasConnected)
throw new AvroRuntimeException("Not both one-way: " + messageName);
Object response = null;
try
{
response = Respond(m, request);
context.Response = response;
}
catch (Exception e)
{
error = e;
context.Error = error;
log.Warn("user error", e);
}
if (m.Oneway.GetValueOrDefault() && wasConnected) // no response data
return null;
output.WriteBoolean(error != null);
if (error == null)
WriteResponse(m.Response, response, output);
else
{
try
{
WriteError(m.SupportedErrors, error, output);
}
catch (Exception)
{
// Presumably no match on the exception, throw the original
throw error;
}
}
}
catch (Exception e)
{
// system error
log.Warn("system error", e);
context.Error = e;
bbo = new ByteBufferOutputStream();
output = new BinaryEncoder(bbo);
output.WriteBoolean(true);
WriteError(errorSchema /*Protocol.SYSTEM_ERRORS*/, e.ToString(), output);
if (null == handshake)
{
handshake = new ByteBufferOutputStream().GetBufferList();
}
}
output.Flush();
List<MemoryStream> payload = bbo.GetBufferList();
// Grab meta-data from plugins
context.ResponsePayload = payload;
META_WRITER.Write(context.ResponseCallMeta, output);
output.Flush();
// Prepend handshake and append payload
bbo.Prepend(handshake);
bbo.Append(payload);
return bbo.GetBufferList();
}