in tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java [207:241]
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof TSOProto.Request) {
TSOProto.Request request = (TSOProto.Request) msg;
if (request.hasHandshakeRequest()) {
checkHandshake(ctx, request.getHandshakeRequest());
return;
}
if (!handshakeCompleted(ctx)) {
LOG.error("Handshake not completed. Closing channel {}", ctx.channel());
ctx.channel().close();
}
if (request.hasTimestampRequest()) {
requestProcessor.timestampRequest(ctx.channel(), MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasCommitRequest()) {
TSOProto.CommitRequest cr = request.getCommitRequest();
requestProcessor.commitRequest(cr.getStartTimestamp(),
cr.getCellIdList(),
cr.getTableIdList(),
cr.getIsRetry(),
ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasFenceRequest()) {
TSOProto.FenceRequest fr = request.getFenceRequest();
requestProcessor.fenceRequest(fr.getTableId(),
ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else {
LOG.error("Invalid request {}. Closing channel {}", request, ctx.channel());
ctx.channel().close();
}
} else {
LOG.error("Unknown message type", msg);
}
}