OperationResponse doOperationInvoke()

in sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCClient.java [59:177]


    OperationResponse<RespType, StrReqType> doOperationInvoke(
            OperationModelContext<ReqType, RespType, StrReqType, StrRespType> operationModelContext,
            final ReqType request, Optional<StreamResponseHandler<StrRespType>> streamResponseHandler) {
        if (operationModelContext.isStreamingOperation() && !streamResponseHandler.isPresent()) {
            //Even if an operation does not have a streaming response (has streaming input), a
            //stream is physically bidirectional, and even if a streaming response isn't allowed
            //the other side may still send an error through the open stream.
            throw new IllegalArgumentException(operationModelContext.getOperationName() + " is a streaming operation. Must have a streaming response handler!");
        }
        final CompletableFuture<RespType> responseFuture = new CompletableFuture<>();
        final AtomicBoolean isContinuationClosed = new AtomicBoolean(true);
        final ClientConnectionContinuation continuation = connection.newStream(new ClientConnectionContinuationHandler() {
            boolean initialResponseReceived = false;

            @Override
            protected void onContinuationMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
                final Optional<String> applicationModelType = headers.stream()
                        .filter(header -> header.getName().equals(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER)
                                && header.getHeaderType().equals(HeaderType.String))
                        .map(header -> header.getValueAsString())
                        .findFirst();

                //first message back must parse into immediate response unless it's an error
                //follow on messages are stream response handler intended
                if (messageType.equals(MessageType.ApplicationMessage)) {
                    //important following not else if
                    if (applicationModelType.isPresent()) {
                        handleData(applicationModelType.get(), payload, !initialResponseReceived, responseFuture, streamResponseHandler,
                                operationModelContext, continuation, isContinuationClosed);
                    }
                    //intentionally not else if here. We can have data, and the terminate flag set
                    if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
                        this.close();
                        handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
                    } else if (!applicationModelType.isPresent()) {
                        handleError(new UnmappedDataException(initialResponseReceived ? operationModelContext.getResponseApplicationModelType() :
                                        operationModelContext.getStreamingResponseApplicationModelType().get()), initialResponseReceived,
                                responseFuture, streamResponseHandler, continuation, isContinuationClosed);
                    }
                    initialResponseReceived = true;
                } else if (messageType.equals(MessageType.ApplicationError)) {
                    final Optional<Class<? extends EventStreamJsonMessage>> errorClass =
                            operationModelContext.getServiceModel().getApplicationModelClass(applicationModelType.orElse(""));
                    if (!errorClass.isPresent()) {
                        LOGGER.severe(String.format("Could not map error from service. Incoming error type: "
                                + applicationModelType.orElse("null")));
                        handleError(new UnmappedDataException(applicationModelType.orElse("null")),
                                !initialResponseReceived, responseFuture, streamResponseHandler, continuation, isContinuationClosed);
                    } else {
                        try {
                            final EventStreamOperationError error = (EventStreamOperationError) operationModelContext.getServiceModel().fromJson(errorClass.get(), payload);
                            handleError(error, !initialResponseReceived, responseFuture, streamResponseHandler, continuation, isContinuationClosed);
                        } catch (Exception e) { //shouldn't be possible, but this is an error on top of an error
                        }
                    }

                    //TODO: application errors always have TerminateStream flag set?
                    //first close the stream immediately if the other side hasn't already done so
                    if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
                        try {
                            this.close();
                            //is this call needed?
                            handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
                        } catch (Exception e) {
                            LOGGER.warning(String.format("Exception thrown closing stream on application error received %s: %s",
                                    e.getClass().getName(), e.getMessage()));
                        }
                    }
                } else if (messageType == MessageType.Ping) {
                    //echo back ping messages to be nice to server, can these happen on continuations?
                    continuation.sendMessage(headers, payload, MessageType.PingResponse, messageFlags);
                } else if (messageType == MessageType.PingResponse) {    //do nothing on ping response
                } else if (messageType == MessageType.ServerError) {
                    //TODO: exception should route to response handler here, also is or will be "InternalError" soon
                    LOGGER.severe(operationModelContext.getOperationName() + " server error received");
                    this.close();   //underlying connection callbacks should handle things appropriately
                } else if (messageType == MessageType.ProtocolError) {    //do nothing on ping response
                    LOGGER.severe(operationModelContext.getOperationName() + " protocol error received");
                    this.close();   //underlying connection callbacks should handle things appropriately but close continuation either way
                } else {
                    //unexpected message type received on stream
                    handleError(new InvalidDataException(messageType), !initialResponseReceived, responseFuture,
                            streamResponseHandler, continuation, isContinuationClosed);
                    try {
                        sendClose(continuation, isContinuationClosed).whenComplete((res, ex) -> {
                            if (ex != null) {
                                LOGGER.warning(String.format("Sending close on invalid message threw %s: %s",
                                        ex.getClass().getCanonicalName(), ex.getMessage()));
                            }
                        });
                    } catch (Exception e) {
                        LOGGER.warning(String.format("Sending close on invalid message threw %s: %s",
                                e.getClass().getCanonicalName(), e.getMessage()));
                    }
                }
            }

            @Override
            protected void onContinuationClosed() {
                super.onContinuationClosed();
                handleClose(initialResponseReceived, responseFuture, streamResponseHandler);
            }
        });
        isContinuationClosed.compareAndSet(false, true);

        final List<Header> headers = new LinkedList<>();
        headers.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
                EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_JSON));
        headers.add(Header.createHeader(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER,
                operationModelContext.getRequestApplicationModelType()));
        final byte[] payload = operationModelContext.getServiceModel().toJson(request);

        final CompletableFuture<Void> messageFlushFuture = continuation.activate(operationModelContext.getOperationName(),
                headers, payload, MessageType.ApplicationMessage, 0);
        final OperationResponse<RespType, StrReqType> response = new OperationResponse(operationModelContext, continuation,
                responseFuture, messageFlushFuture);

        return response;
    }