in kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java [70:109]
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
CDCResponse response = (CDCResponse) msg;
ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
Optional<ResponseFuture> responseFuture = Optional.ofNullable(connectionContext.getResponseFutureMap().get(response.getRequestId()));
if (response.getStatus() != Status.SUCCEED) {
Type requestType = Type.UNKNOWN;
if (responseFuture.isPresent()) {
final ResponseFuture future = responseFuture.get();
future.setErrorCode(response.getErrorCode());
future.setErrorMessage(response.getErrorMessage());
future.countDown();
requestType = future.getRequestType();
}
errorResultHandler.handleServerError(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType));
responseFuture.ifPresent(future -> {
future.setErrorCode(response.getErrorCode());
future.setErrorMessage(response.getErrorMessage());
future.countDown();
});
return;
}
if (response.hasServerGreetingResult()) {
ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
log.info("Received server greeting result, serverVersion={}, protocolVersion={}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
return;
}
if (ClientConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus().get() && responseFuture.isPresent() && Type.LOGIN == responseFuture.get().getRequestType()) {
responseFuture.ifPresent(ResponseFuture::countDown);
connectionContext.getStatus().set(ClientConnectionStatus.LOGGED_IN);
return;
}
if (response.hasStreamDataResult()) {
StreamDataResult streamDataResult = response.getStreamDataResult();
responseFuture.ifPresent(future -> future.setResult(response.getStreamDataResult().getStreamingId()));
connectionContext.getStreamingIds().add(streamDataResult.getStreamingId());
} else if (response.hasDataRecordResult()) {
processDataRecords(ctx, response.getDataRecordResult());
}
responseFuture.ifPresent(ResponseFuture::countDown);
}