in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java [138:181]
public KafkaDynamicSink(
DataType consumedDataType,
DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
String topic,
Properties properties,
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
DeliveryGuarantee deliveryGuarantee,
boolean upsertMode,
SinkBufferFlushMode flushMode,
@Nullable Integer parallelism,
@Nullable String transactionalIdPrefix) {
// Format attributes
this.consumedDataType =
checkNotNull(consumedDataType, "Consumed data type must not be null.");
this.physicalDataType =
checkNotNull(physicalDataType, "Physical data type must not be null.");
this.keyEncodingFormat = keyEncodingFormat;
this.valueEncodingFormat =
checkNotNull(valueEncodingFormat, "Value encoding format must not be null.");
this.keyProjection = checkNotNull(keyProjection, "Key projection must not be null.");
this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;
this.transactionalIdPrefix = transactionalIdPrefix;
// Mutable attributes
this.metadataKeys = Collections.emptyList();
// Kafka-specific attributes
this.topic = checkNotNull(topic, "Topic must not be null.");
this.properties = checkNotNull(properties, "Properties must not be null.");
this.partitioner = partitioner;
this.deliveryGuarantee =
checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be null.");
this.upsertMode = upsertMode;
this.flushMode = checkNotNull(flushMode);
if (flushMode.isEnabled() && !upsertMode) {
throw new IllegalArgumentException(
"Sink buffer flush is only supported in upsert-kafka.");
}
this.parallelism = parallelism;
}