public void channelRead()

in kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java [70:109]


    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        CDCResponse response = (CDCResponse) msg;
        ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
        Optional<ResponseFuture> responseFuture = Optional.ofNullable(connectionContext.getResponseFutureMap().get(response.getRequestId()));
        if (response.getStatus() != Status.SUCCEED) {
            Type requestType = Type.UNKNOWN;
            if (responseFuture.isPresent()) {
                final ResponseFuture future = responseFuture.get();
                future.setErrorCode(response.getErrorCode());
                future.setErrorMessage(response.getErrorMessage());
                future.countDown();
                requestType = future.getRequestType();
            }
            errorResultHandler.handleServerError(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType));
            responseFuture.ifPresent(future -> {
                future.setErrorCode(response.getErrorCode());
                future.setErrorMessage(response.getErrorMessage());
                future.countDown();
            });
            return;
        }
        if (response.hasServerGreetingResult()) {
            ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
            log.info("Received server greeting result, serverVersion={}, protocolVersion={}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
            return;
        }
        if (ClientConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus().get() && responseFuture.isPresent() && Type.LOGIN == responseFuture.get().getRequestType()) {
            responseFuture.ifPresent(ResponseFuture::countDown);
            connectionContext.getStatus().set(ClientConnectionStatus.LOGGED_IN);
            return;
        }
        if (response.hasStreamDataResult()) {
            StreamDataResult streamDataResult = response.getStreamDataResult();
            responseFuture.ifPresent(future -> future.setResult(response.getStreamDataResult().getStreamingId()));
            connectionContext.getStreamingIds().add(streamDataResult.getStreamingId());
        } else if (response.hasDataRecordResult()) {
            processDataRecords(ctx, response.getDataRecordResult());
        }
        responseFuture.ifPresent(ResponseFuture::countDown);
    }