in sdk/greengrass/event-stream-rpc-server/src/main/java/software/amazon/awssdk/eventstreamrpc/OperationContinuationHandler.java [240:309]
final protected void onContinuationMessage(List<Header> list, byte[] bytes, MessageType messageType, int messageFlags) {
LOGGER.debug("Continuation native id: " + continuation.getNativeHandle());
//We can prevent a client from sending a request, and hanging up before receiving a response
//but doing so will prevent any work from being done
if (initialRequest == null && (messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
LOGGER.debug("Not invoking " + getOperationName() + " operation for client request received with a " +
"terminate flag set to 1");
return;
}
final EventStreamRPCServiceModel serviceModel = getOperationModelContext().getServiceModel();
try {
if (initialRequest != null) {
// Empty close stream messages from the client are valid. Do not need any processing here.
if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0 && (bytes == null || bytes.length == 0)) {
return;
} else {
final StreamingRequestType streamEvent = serviceModel.fromJson(getStreamingRequestClass(), bytes);
//exceptions occurring during this processing will result in closure of stream
handleStreamEvent(streamEvent);
}
} else { //this is the initial request
initialRequestHeaders = new ArrayList<>(list);
initialRequest = serviceModel.fromJson(getRequestClass(), bytes);
//call into business logic
final ResponseType result = handleRequest(initialRequest);
if (result != null) {
if (!getResponseClass().isInstance(result)) {
throw new RuntimeException("Handler for operation [" + getOperationName()
+ "] did not return expected type. Found: " + result.getClass().getName());
}
sendMessage(result, !isStreamingOperation()).whenComplete((res, ex) -> {
if (ex != null) {
LOGGER.error(ex.getClass().getName() + " sending response message: " + ex.getMessage());
} else {
LOGGER.trace("Response successfully sent");
}
});
invokeAfterHandleRequest();
} else {
//not streaming, but null response? we have a problem
throw new RuntimeException("Operation handler returned null response!");
}
}
} catch (EventStreamOperationError e) {
//We do not check if the specific exception thrown is a part of the core service?
sendModeledError(e);
invokeAfterHandleRequest();
} catch (Exception e) {
final List<Header> responseHeaders = new ArrayList<>(1);
byte[] outputPayload = "InternalServerError".getBytes(StandardCharsets.UTF_8);
responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_TEXT));
//are there any exceptions we wouldn't want to return a generic server fault?
//this is the kind of exception that should be logged with a request ID especially in a server-client context
LOGGER.error("[{}] operation threw unexpected {}: {}", getOperationName(),
e.getClass().getCanonicalName(), e.getMessage());
continuation.sendMessage(responseHeaders, outputPayload, MessageType.ApplicationError, MessageFlags.TerminateStream.getByteValue())
.whenComplete((res, ex) -> {
if (ex != null) {
LOGGER.error(ex.getClass().getName() + " sending error response message: " + ex.getMessage());
}
else {
LOGGER.trace("Error response successfully sent");
}
continuation.close();
});
}
}