in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java [145:192]
public KafkaDynamicSink(
DataType consumedDataType,
DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
@Nullable KafkaPartitioner<RowData> partitioner,
DeliveryGuarantee deliveryGuarantee,
boolean upsertMode,
SinkBufferFlushMode flushMode,
@Nullable Integer parallelism,
@Nullable String transactionalIdPrefix,
TransactionNamingStrategy transactionNamingStrategy) {
// Format attributes
this.consumedDataType =
checkNotNull(consumedDataType, "Consumed data type must not be null.");
this.physicalDataType =
checkNotNull(physicalDataType, "Physical data type must not be null.");
this.keyEncodingFormat = keyEncodingFormat;
this.valueEncodingFormat =
checkNotNull(valueEncodingFormat, "Value encoding format must not be null.");
this.keyProjection = checkNotNull(keyProjection, "Key projection must not be null.");
this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;
this.transactionalIdPrefix = transactionalIdPrefix;
this.transactionNamingStrategy = transactionNamingStrategy;
// Mutable attributes
this.metadataKeys = Collections.emptyList();
// Kafka-specific attributes
this.topics = topics;
this.topicPattern = topicPattern;
this.properties = checkNotNull(properties, "Properties must not be null.");
this.partitioner = partitioner;
this.deliveryGuarantee =
checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be null.");
this.upsertMode = upsertMode;
this.flushMode = checkNotNull(flushMode);
if (flushMode.isEnabled() && !upsertMode) {
throw new IllegalArgumentException(
"Sink buffer flush is only supported in upsert-kafka.");
}
this.parallelism = parallelism;
}