in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java [78:169]
public Collection<SnapshotSplit> split(SplitContext splitContext) {
long chunkSizeInBytes = splitContext.getChunkSizeMB() * 1024 * 1024;
long sizeInBytes = splitContext.getSizeInBytes();
long count = splitContext.getDocumentCount();
// If collection's total uncompressed size less than chunk size,
// treat the entire collection as single chunk.
if (sizeInBytes < chunkSizeInBytes) {
return SingleSplitStrategy.INSTANCE.split(splitContext);
}
int numChunks = (int) (sizeInBytes / chunkSizeInBytes) + 1;
int numberOfSamples;
if (count < DEFAULT_SAMPLING_THRESHOLD) {
// full sampling if document count less than sampling size threshold.
numberOfSamples = (int) count;
} else {
numberOfSamples = Math.min(numChunks * splitContext.getSamplesPerChunk(), (int) count);
}
TableId collectionId = splitContext.getCollectionId();
MongoCollection<BsonDocument> collection =
collectionFor(splitContext.getMongoClient(), collectionId, BsonDocument.class);
List<Bson> pipeline = new ArrayList<>();
if (numberOfSamples != count) {
pipeline.add(sample(numberOfSamples));
}
pipeline.add(bucketAuto("$" + ID_FIELD, numChunks));
LOG.info(
"Collection {} going to sample {} records into {} chunks",
collectionId,
numberOfSamples,
numChunks);
List<BsonDocument> chunks =
collection.aggregate(pipeline).allowDiskUse(true).into(new ArrayList<>());
LOG.info(
"Collection {} got {} chunks by auto bucket and sample",
collectionId,
chunks.size());
RowType rowType = shardKeysToRowType(Collections.singleton(ID_FIELD));
List<SnapshotSplit> snapshotSplits = new ArrayList<>(chunks.size() + 2);
Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
schema.put(collectionId, MongoDBDialect.collectionSchema(collectionId));
SnapshotSplit firstSplit =
new SnapshotSplit(
collectionId,
0,
rowType,
ChunkUtils.minLowerBoundOfId(),
ChunkUtils.boundOfId(lowerBoundOfBucket(chunks.get(0))),
null,
schema);
snapshotSplits.add(firstSplit);
for (int i = 0; i < chunks.size(); i++) {
BsonDocument bucket = chunks.get(i);
snapshotSplits.add(
new SnapshotSplit(
collectionId,
i + 1,
rowType,
ChunkUtils.boundOfId(lowerBoundOfBucket(bucket)),
ChunkUtils.boundOfId(upperBoundOfBucket(bucket)),
null,
schema));
}
SnapshotSplit lastSplit =
new SnapshotSplit(
collectionId,
chunks.size() + 1,
rowType,
ChunkUtils.boundOfId(upperBoundOfBucket(chunks.get(chunks.size() - 1))),
ChunkUtils.maxUpperBoundOfId(),
null,
schema);
if (splitContext.isAssignUnboundedChunkFirst()) {
snapshotSplits.add(0, lastSplit);
} else {
snapshotSplits.add(lastSplit);
}
return snapshotSplits;
}