private Subscribe validatePartitionSubscription()

in holo-client/src/main/java/com/alibaba/hologres/client/HoloClient.java [814:913]


	private Subscribe validatePartitionSubscription(Subscribe partitionedSubscribe) throws HoloClientException {
		TableSchema parentSchema = getTableSchema(partitionedSubscribe.getTableName());
		if (!parentSchema.isPartitionParentTable()) {
			throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("The table %s is not a partition parent table.", partitionedSubscribe.getTableName()));
		}
		if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.DISABLE) {
			throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition table %s binlog should set BinlogPartitionSubscribeMode.", partitionedSubscribe.getTableName()));
		}
		HoloVersion holoVersion = Command.getHoloVersion(this);
		// 消费分区父表binlog,可能占用较多的连接数,建议2.1.20版本以上结合fixed fe模式来使用.
		if (holoVersion.compareTo(new HoloVersion("2.1.20")) < 0) {
			throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscription partition parent table requires the hologres instance version greater than 2.1.20. your version is %s", holoVersion));
		}
		// 消费分区父表需要设置心跳时间间隔
		if (config.getBinlogHeartBeatIntervalMs() <= 0) {
			throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog need set BinlogHeartBeatIntervalMs.", partitionedSubscribe.getTableName()));
		}
		// 心跳时间间隔必须小于超时时间
		if (config.getBinlogReadTimeoutMs() > 0 && config.getBinlogReadTimeoutMs() < config.getBinlogHeartBeatIntervalMs()) {
			throw new HoloClientException(ExceptionCode.INVALID_REQUEST, "Subscribe binlog need set BinlogReadTimeoutSeconds greater than BinlogHeartBeatIntervalMs.");
		}
		NavigableMap<String, Subscribe> partitionToSubscribe = partitionedSubscribe.getPartitionToSubscribeMap();
		int shardCount = Command.getShardCount(this, parentSchema);
		Map<Integer, BinlogOffset> offsetMap = checkBinlogOffsetMap(partitionedSubscribe, shardCount);
		Set<Integer> shardIds = offsetMap.keySet();
		LinkedHashMap<String, TableName> partValueToPartitionTableMap = Command.getPartValueToPartitionTableMap(this, parentSchema);
		if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.STATIC) {
			// STATIC mode
			for (Subscribe subscribe : partitionToSubscribe.values()) {
				TableName partitionName = TableName.valueOf(subscribe.getTableName());
				if (!partValueToPartitionTableMap.containsValue(partitionName)) {
					throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition table %s not a child table of %s.", partitionName, partitionedSubscribe.getTableName()));
				}
				if (!subscribe.getOffsetMap().keySet().equals(shardIds)) {
					throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("shardIds %s to subscribe of partition table %s not same with parent table %s binlog offset shardIds %s.",
							offsetMap.keySet(),
							subscribe.getTableName(),
							partitionedSubscribe.getTableName(),
							shardIds));
				}
			}
		} else if (config.getBinlogPartitionSubscribeMode() == BinlogPartitionSubscribeMode.DYNAMIC) {
			// DYNAMIC模式需要开启自动分区, 且需要设置预创建分区数量大于等于1
			if (!parentSchema.getAutoPartitioning().isEnable()) {
				throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode need enable auto partitioning.", partitionedSubscribe.getTableName()));
			}
			if (parentSchema.getAutoPartitioning().getPreCreateNum() < 1) {
                throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode need set PreCreateNum lager than 1, but now is %s.",
						partitionedSubscribe.getTableName(),
						parentSchema.getAutoPartitioning().getPreCreateNum()));
            }
			// DYNAMIC模式有状态恢复,最多只能指定2张表,在消费延迟数据时,可能存在两张表同时保存checkpoint的情况
			if (partitionToSubscribe.size() > 2) {
				throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("Subscribe partition parent table %s binlog by DYNAMIC mode," +
						" need to specify at most two partition tables, but now is %s.", partitionedSubscribe.getTableName(), partitionToSubscribe.size()));
			}
			// 考虑用户传入的表可能是较早的分区子表,已经被删除
			Iterator<Map.Entry<String, Subscribe>> iterator = partitionToSubscribe.entrySet().iterator();
			while (iterator.hasNext()) {
				Map.Entry<String, Subscribe> entry = iterator.next();
				if (!partValueToPartitionTableMap.containsValue(TableName.valueOf(entry.getKey()))) {
					boolean hasLsn = entry.getValue().getOffsetMap().values().stream()
							.anyMatch(BinlogOffset::hasSequence);
					if (hasLsn) {
						throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition table %s not found, it may be an earlier partition table that has been dropped, could not start subscribe from lsn.", entry.getKey()));
					} else {
						LOGGER.warn("partition table {} not found, it may be an earlier partition table that has been dropped, ignore it.", entry.getKey());
						iterator.remove();
					}
				}
			}
			// 传入的子表都已经过期被删除了,消费默认从目前最早的分区子表开始
			if (partitionToSubscribe.isEmpty()) {
				Iterator<Map.Entry<String, TableName>> iter = partValueToPartitionTableMap.entrySet().iterator();
				TableName oldestPartition = null;
				while (iter.hasNext()) {
					TableName tableName = iter.next().getValue();
					if (PartitionUtil.isPartitionTableNameLegal(tableName.getTableName(), parentSchema.getAutoPartitioning())) {
						oldestPartition = tableName;
						break;
					} else {
						// 分区子表表名不符合规则, 忽略
						LOGGER.warn("partition table {} is illegal, ignore it.", tableName);
					}
				}
				if (oldestPartition == null) {
                    throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("partition parent table %s has no valid partition table.", partitionedSubscribe.getTableName()));
                }
				// 继承部分传入的父表订阅信息, 重新构建
				Subscribe.OffsetBuilder newBuilder = Subscribe.newOffsetBuilder(partitionedSubscribe.getTableName());
				for (Map.Entry<Integer, BinlogOffset> entry : offsetMap.entrySet()) {
					newBuilder.addShardStartOffset(entry.getKey(), entry.getValue());
				}
				newBuilder.addShardsStartOffsetForPartition(oldestPartition.getFullName(), shardIds, new BinlogOffset());
				partitionedSubscribe = newBuilder.build();
				LOGGER.info("start binlog subscribe from oldest partition {} by default.", oldestPartition);
			}
		}
		return partitionedSubscribe;
	}