in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java [84:106]
public SplitT toJdbcSourceSplit() {
final CheckpointedOffset position =
(offset == null && recordsToSkipAfterOffset == 0)
? null
: new CheckpointedOffset(offset, recordsToSkipAfterOffset);
final JdbcSourceSplit updatedSplit = split.updateWithCheckpointedPosition(position);
// some sanity checks to avoid surprises and not accidentally lose split information
if (updatedSplit == null) {
throw new FlinkRuntimeException(
"Split returned 'null' in updateWithCheckpointedPosition(): " + split);
}
if (updatedSplit.getClass() != split.getClass()) {
throw new FlinkRuntimeException(
String.format(
"Split returned different type in updateWithCheckpointedPosition(). "
+ "Split type is %s, returned type is %s",
split.getClass().getName(), updatedSplit.getClass().getName()));
}
return (SplitT) updatedSplit;
}