in sdk/greengrass/event-stream-rpc-server/src/main/java/software/amazon/awssdk/eventstreamrpc/OperationContinuationHandler.java [294:359]
final protected void onContinuationMessage(List<Header> list, byte[] bytes, MessageType messageType,
int messageFlags) {
LOGGER.debug("Continuation native id: " + continuation.getNativeHandle());
//We can prevent a client from sending a request, and hanging up before receiving a response
//but doing so will prevent any work from being done
if (initialRequest == null && (messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
LOGGER.debug("Not invoking " + getOperationName() + " operation for client request received with a "
+ "terminate flag set to 1");
return;
}
final EventStreamRPCServiceModel serviceModel = getOperationModelContext().getServiceModel();
try {
if (initialRequest != null) {
// Empty close stream messages from the client are valid. Do not need any processing here.
if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0 && (bytes == null
|| bytes.length == 0)) {
return;
} else {
final StreamingRequestType streamEvent = serviceModel.fromJson(getStreamingRequestClass(), bytes);
//exceptions occurring during this processing will result in closure of stream
handleStreamEvent(streamEvent);
}
} else {
//this is the initial request
initialRequestHeaders = new ArrayList<>(list);
initialRequest = serviceModel.fromJson(getRequestClass(), bytes);
//call into business logic
CompletableFuture<ResponseType> resultFuture = handleRequestAsync(initialRequest);
if (resultFuture == null) {
resultFuture = CompletableFuture.completedFuture(handleRequest(initialRequest));
}
resultFuture.handle((result, throwable) -> {
if (throwable != null) {
handleAndSendError(throwable);
return null;
}
if (result != null) {
if (!getResponseClass().isInstance(result)) {
throw new RuntimeException("Handler for operation [" + getOperationName()
+ "] did not return expected type. Found: " + result.getClass().getName());
}
sendMessage(result, !isStreamingOperation()).whenComplete((res, ex) -> {
if (ex != null) {
LOGGER.error(ex.getClass().getName() + " sending response message: " + ex.getMessage());
} else {
LOGGER.trace("Response successfully sent");
}
});
invokeAfterHandleRequest();
} else {
//not streaming, but null response? we have a problem
throw new RuntimeException("Operation handler returned null response!");
}
return null;
}).exceptionally((throwable) -> {
if (throwable != null) {
handleAndSendError(throwable);
}
return null;
});
}
} catch (Exception e) {
handleAndSendError(e);
}
}