in holo-client/src/main/java/com/alibaba/hologres/client/HoloClient.java [814:913]
private Subscribe validatePartitionSubscription(Subscribe partitionedSubscribe) throws HoloClientException {
TableSchema parentSchema = getTableSchema(partitionedSubscribe.getTableName());
if (!parentSchema.isPartitionParentTable()) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("The table %s is not a partition parent table.", partitionedSubscribe.getTableName()));
}
if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.DISABLE) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition table %s binlog should set BinlogPartitionSubscribeMode.", partitionedSubscribe.getTableName()));
}
HoloVersion holoVersion = Command.getHoloVersion(this);
// 消费分区父表binlog,可能占用较多的连接数,建议2.1.20版本以上结合fixed fe模式来使用.
if (holoVersion.compareTo(new HoloVersion("2.1.20")) < 0) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscription partition parent table requires the hologres instance version greater than 2.1.20. your version is %s", holoVersion));
}
// 消费分区父表需要设置心跳时间间隔
if (config.getBinlogHeartBeatIntervalMs() <= 0) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog need set BinlogHeartBeatIntervalMs.", partitionedSubscribe.getTableName()));
}
// 心跳时间间隔必须小于超时时间
if (config.getBinlogReadTimeoutMs() > 0 && config.getBinlogReadTimeoutMs() < config.getBinlogHeartBeatIntervalMs()) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, "Subscribe binlog need set BinlogReadTimeoutSeconds greater than BinlogHeartBeatIntervalMs.");
}
NavigableMap<String, Subscribe> partitionToSubscribe = partitionedSubscribe.getPartitionToSubscribeMap();
int shardCount = Command.getShardCount(this, parentSchema);
Map<Integer, BinlogOffset> offsetMap = checkBinlogOffsetMap(partitionedSubscribe, shardCount);
Set<Integer> shardIds = offsetMap.keySet();
LinkedHashMap<String, TableName> partValueToPartitionTableMap = Command.getPartValueToPartitionTableMap(this, parentSchema);
if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.STATIC) {
// STATIC mode
for (Subscribe subscribe : partitionToSubscribe.values()) {
TableName partitionName = TableName.valueOf(subscribe.getTableName());
if (!partValueToPartitionTableMap.containsValue(partitionName)) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition table %s not a child table of %s.", partitionName, partitionedSubscribe.getTableName()));
}
if (!subscribe.getOffsetMap().keySet().equals(shardIds)) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("shardIds %s to subscribe of partition table %s not same with parent table %s binlog offset shardIds %s.",
offsetMap.keySet(),
subscribe.getTableName(),
partitionedSubscribe.getTableName(),
shardIds));
}
}
} else if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.DYNAMIC) {
// DYNAMIC模式需要开启自动分区, 且需要设置预创建分区数量大于等于1
if (!parentSchema.getAutoPartitioning().isEnable()) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode need enable auto partitioning.", partitionedSubscribe.getTableName()));
}
if (parentSchema.getAutoPartitioning().getPreCreateNum() < 1) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode need set PreCreateNum lager than 1, but now is %s.",
partitionedSubscribe.getTableName(),
parentSchema.getAutoPartitioning().getPreCreateNum()));
}
// DYNAMIC模式有状态恢复,最多只能指定2张表,在消费延迟数据时,可能存在两张表同时保存checkpoint的情况
if (partitionToSubscribe.size() > 2) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode," +
" need to specify at most two partition tables, but now is %s.", partitionedSubscribe.getTableName(), partitionToSubscribe.size()));
}
// 考虑用户传入的表可能是较早的分区子表,已经被删除
Iterator<Map.Entry<String, Subscribe>> iterator = partitionToSubscribe.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Subscribe> entry = iterator.next();
if (!partValueToPartitionTableMap.containsValue(TableName.valueOf(entry.getKey()))) {
boolean hasLsn = entry.getValue().getOffsetMap().values().stream()
.anyMatch(BinlogOffset::hasSequence);
if (hasLsn) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition table %s not found, it may be an earlier partition table that has been dropped, could not start subscribe from lsn.", entry.getKey()));
} else {
LOGGER.warn("partition table {} not found, it may be an earlier partition table that has been dropped, ignore it.", entry.getKey());
iterator.remove();
}
}
}
// 传入的子表都已经过期被删除了,消费默认从目前最早的分区子表开始
if (partitionToSubscribe.isEmpty()) {
Iterator<Map.Entry<String, TableName>> iter = partValueToPartitionTableMap.entrySet().iterator();
TableName oldestPartition = null;
while (iter.hasNext()) {
TableName tableName = iter.next().getValue();
if (PartitionUtil.isPartitionTableNameLegal(tableName.getTableName(), parentSchema.getAutoPartitioning())) {
oldestPartition = tableName;
break;
} else {
// 分区子表表名不符合规则, 忽略
LOGGER.warn("partition table {} is illegal, ignore it.", tableName);
}
}
if (oldestPartition == null) {
throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition parent table %s has no valid partition table.", partitionedSubscribe.getTableName()));
}
// 继承部分传入的父表订阅信息, 重新构建
Subscribe.OffsetBuilder newBuilder = Subscribe.newOffsetBuilder(partitionedSubscribe.getTableName());
for (Map.Entry<Integer, BinlogOffset> entry : offsetMap.entrySet()) {
newBuilder.addShardStartOffset(entry.getKey(), entry.getValue());
}
newBuilder.addShardsStartOffsetForPartition(oldestPartition.getFullName(), shardIds, new BinlogOffset());
partitionedSubscribe = newBuilder.build();
LOGGER.info("start binlog subscribe from oldest partition {} by default.", oldestPartition);
}
}
return partitionedSubscribe;
}