in spanner-data-validator-java/src/main/java/com/google/migration/TableSpecList.java [336:398]
private static PartitionKey determinePartitionKey(SourceTable sourceTable, SpannerTable spannerTable) {
PartitionKey partitionKey = null;
//Store the column at the first ordinal position of the PK at Spanner
ColumnPK[] spannerPKs = spannerTable.getPrimaryKeys();
Arrays.sort(spannerPKs, Comparator.comparingInt(ColumnPK::getOrder));
ColumnPK spannerPKAtFirstPosition = spannerPKs[0];
//Get the ID and data type of the column in the first ordinal position of the source PK
ColumnPK[] sourcePKs = sourceTable.getPrimaryKeys();
if (sourcePKs.length == 0) {
LOG.info("Source table does not have a PK, skipping validation");
return null;
}
Arrays.sort(sourcePKs, Comparator.comparingInt(ColumnPK::getOrder));
ColumnPK sourcePKAtFirstPosition = sourcePKs[0];
String sourcePKAtFirstPositionDataType = sourceTable.getColDefs().get(sourcePKAtFirstPosition.getColId()).getType().getName();
//Check if source and spanner PK can be used to find PK.
if (sourcePKAtFirstPosition.getColId().equals(spannerPKAtFirstPosition.getColId())) {
partitionKey = createPartitionKey(sourcePKAtFirstPosition.getColId(), sourcePKAtFirstPositionDataType);
}
//No match with Spanner PK, check against indexes on spanner table
if (partitionKey == null) {
LOG.info("###########(Source PK, Spanner PK) -> No partition key found###########");
partitionKey = searchSpannerIndexesForPartitionKey(sourcePKAtFirstPosition.getColId(), sourcePKAtFirstPositionDataType,
spannerTable.getIndexes());
}
//source PK did not match with any PK or index at Spanner, check if any source indexes
//can be used as a partitioning key
if (partitionKey == null) {
LOG.info("###########(Source PK, Spanner Indexes) -> No partition key found###########");
Index[] sourceIndexes = sourceTable.getIndexes();
if (sourceIndexes != null) {
for (Index sourceIndex: sourceIndexes) {
Arrays.sort(sourceIndex.getKeys(), Comparator.comparingInt(IndexKey::getOrder));
IndexKey sourceIndexAtFirstPosition = sourceIndex.getKeys()[0];
String sourceIndexAtFirstPositionDataType = sourceTable.getColDefs()
.get(sourceIndexAtFirstPosition.getColId()).getType().getName();
//first check with spanner PK
if (sourceIndexAtFirstPosition.getColId().equals(spannerPKAtFirstPosition.getColId())) {
partitionKey = createPartitionKey(sourceIndexAtFirstPosition.getColId(),
sourceIndexAtFirstPositionDataType);
}
//if not found, check spanner indexes
if (partitionKey == null) {
LOG.info("###########(Source Indexes, Spanner PK) -> No partition key found###########");
partitionKey = searchSpannerIndexesForPartitionKey(sourceIndexAtFirstPosition.getColId(),
sourceIndexAtFirstPositionDataType, spannerTable.getIndexes());
}
}
}
}
if (partitionKey == null) {
LOG.info("###########(Source Indexes, Spanner Indexes) -> No partition key found###########");
LOG.info("No suitable partitioning key was found for Source table: {}, Spanner table: {}", sourceTable.getName(), spannerTable.getName());
LOG.info("Only (int, bigint) partition keys are supported, with a the following condition: There should exist an index (PK or secondary index) whose first ordinal position is a supported partitioning data type and is identical between source and spanner");
}
return partitionKey;
}