in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java [176:235]
public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<TopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
BoundedMode boundedMode,
Map<TopicPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier,
@Nullable Integer parallelism) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
physicalDataType, "Physical data type must not be null.");
this.keyDecodingFormat = keyDecodingFormat;
this.valueDecodingFormat =
Preconditions.checkNotNull(
valueDecodingFormat, "Value decoding format must not be null.");
this.keyProjection =
Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
this.valueProjection =
Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;
// Mutable attributes
this.producedDataType = physicalDataType;
this.metadataKeys = Collections.emptyList();
this.watermarkStrategy = null;
// Kafka-specific attributes
Preconditions.checkArgument(
(topics != null && topicPattern == null)
|| (topics == null && topicPattern != null),
"Either Topic or Topic Pattern must be set for source.");
this.topics = topics;
this.topicPattern = topicPattern;
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.startupMode =
Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
this.specificStartupOffsets =
Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
this.boundedMode =
Preconditions.checkNotNull(boundedMode, "Bounded mode must not be null.");
this.specificBoundedOffsets =
Preconditions.checkNotNull(
specificBoundedOffsets, "Specific bounded offsets must not be null.");
this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.parallelism = parallelism;
}