in sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCClient.java [59:177]
OperationResponse<RespType, StrReqType> doOperationInvoke(
OperationModelContext<ReqType, RespType, StrReqType, StrRespType> operationModelContext,
final ReqType request, Optional<StreamResponseHandler<StrRespType>> streamResponseHandler) {
if (operationModelContext.isStreamingOperation() && !streamResponseHandler.isPresent()) {
//Even if an operation does not have a streaming response (has streaming input), a
//stream is physically bidirectional, and even if a streaming response isn't allowed
//the other side may still send an error through the open stream.
throw new IllegalArgumentException(operationModelContext.getOperationName() + " is a streaming operation. Must have a streaming response handler!");
}
final CompletableFuture<RespType> responseFuture = new CompletableFuture<>();
final AtomicBoolean isContinuationClosed = new AtomicBoolean(true);
final ClientConnectionContinuation continuation = connection.newStream(new ClientConnectionContinuationHandler() {
boolean initialResponseReceived = false;
@Override
protected void onContinuationMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
final Optional<String> applicationModelType = headers.stream()
.filter(header -> header.getName().equals(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER)
&& header.getHeaderType().equals(HeaderType.String))
.map(header -> header.getValueAsString())
.findFirst();
//first message back must parse into immediate response unless it's an error
//follow on messages are stream response handler intended
if (messageType.equals(MessageType.ApplicationMessage)) {
//important following not else if
if (applicationModelType.isPresent()) {
handleData(applicationModelType.get(), payload, !initialResponseReceived, responseFuture, streamResponseHandler,
operationModelContext, continuation, isContinuationClosed);
}
//intentionally not else if here. We can have data, and the terminate flag set
if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
this.close();
handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
} else if (!applicationModelType.isPresent()) {
handleError(new UnmappedDataException(initialResponseReceived ? operationModelContext.getResponseApplicationModelType() :
operationModelContext.getStreamingResponseApplicationModelType().get()), initialResponseReceived,
responseFuture, streamResponseHandler, continuation, isContinuationClosed);
}
initialResponseReceived = true;
} else if (messageType.equals(MessageType.ApplicationError)) {
final Optional<Class<? extends EventStreamJsonMessage>> errorClass =
operationModelContext.getServiceModel().getApplicationModelClass(applicationModelType.orElse(""));
if (!errorClass.isPresent()) {
LOGGER.severe(String.format("Could not map error from service. Incoming error type: "
+ applicationModelType.orElse("null")));
handleError(new UnmappedDataException(applicationModelType.orElse("null")),
!initialResponseReceived, responseFuture, streamResponseHandler, continuation, isContinuationClosed);
} else {
try {
final EventStreamOperationError error = (EventStreamOperationError) operationModelContext.getServiceModel().fromJson(errorClass.get(), payload);
handleError(error, !initialResponseReceived, responseFuture, streamResponseHandler, continuation, isContinuationClosed);
} catch (Exception e) { //shouldn't be possible, but this is an error on top of an error
}
}
//TODO: application errors always have TerminateStream flag set?
//first close the stream immediately if the other side hasn't already done so
if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
try {
this.close();
//is this call needed?
handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
} catch (Exception e) {
LOGGER.warning(String.format("Exception thrown closing stream on application error received %s: %s",
e.getClass().getName(), e.getMessage()));
}
}
} else if (messageType == MessageType.Ping) {
//echo back ping messages to be nice to server, can these happen on continuations?
continuation.sendMessage(headers, payload, MessageType.PingResponse, messageFlags);
} else if (messageType == MessageType.PingResponse) { //do nothing on ping response
} else if (messageType == MessageType.ServerError) {
//TODO: exception should route to response handler here, also is or will be "InternalError" soon
LOGGER.severe(operationModelContext.getOperationName() + " server error received");
this.close(); //underlying connection callbacks should handle things appropriately
} else if (messageType == MessageType.ProtocolError) { //do nothing on ping response
LOGGER.severe(operationModelContext.getOperationName() + " protocol error received");
this.close(); //underlying connection callbacks should handle things appropriately but close continuation either way
} else {
//unexpected message type received on stream
handleError(new InvalidDataException(messageType), !initialResponseReceived, responseFuture,
streamResponseHandler, continuation, isContinuationClosed);
try {
sendClose(continuation, isContinuationClosed).whenComplete((res, ex) -> {
if (ex != null) {
LOGGER.warning(String.format("Sending close on invalid message threw %s: %s",
ex.getClass().getCanonicalName(), ex.getMessage()));
}
});
} catch (Exception e) {
LOGGER.warning(String.format("Sending close on invalid message threw %s: %s",
e.getClass().getCanonicalName(), e.getMessage()));
}
}
}
@Override
protected void onContinuationClosed() {
super.onContinuationClosed();
handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
}
});
isContinuationClosed.compareAndSet(false, true);
final List<Header> headers = new LinkedList<>();
headers.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_JSON));
headers.add(Header.createHeader(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER,
operationModelContext.getRequestApplicationModelType()));
final byte[] payload = operationModelContext.getServiceModel().toJson(request);
final CompletableFuture<Void> messageFlushFuture = continuation.activate(operationModelContext.getOperationName(),
headers, payload, MessageType.ApplicationMessage, 0);
final OperationResponse<RespType, StrReqType> response = new OperationResponse(operationModelContext, continuation,
responseFuture, messageFlushFuture);
return response;
}