in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java [178:296]
public ConfigDef config() {
return new ConfigDef()
.define(
KAFKA_TOPIC_CONFIG,
Type.STRING,
Importance.HIGH,
"The topic in Kafka which will receive messages that were pulled from Cloud Pub/Sub.")
.define(
ConnectorUtils.CPS_PROJECT_CONFIG,
Type.STRING,
Importance.HIGH,
"The project containing the topic from which to pull messages.")
.define(
CPS_SUBSCRIPTION_CONFIG,
Type.STRING,
Importance.HIGH,
"The name of the subscription to Cloud Pub/Sub.")
.define(
CPS_MAX_BATCH_SIZE_CONFIG,
Type.INT,
DEFAULT_CPS_MAX_BATCH_SIZE,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum number of messages to batch per pull request to Cloud Pub/Sub.")
.define(
CPS_STREAMING_PULL_ENABLED,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Whether to use streaming pull for the connector to connect to Cloud Pub/Sub. If provided, cps.maxBatchSize is ignored.")
.define(
CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES,
Type.LONG,
1000L,
Importance.MEDIUM,
"The maximum number of outstanding messages per task when using streaming pull.")
.define(
CPS_STREAMING_PULL_FLOW_CONTROL_BYTES,
Type.LONG,
100L * 1024 * 1024,
Importance.MEDIUM,
"The maximum number of outstanding message bytes per task when using streaming pull.")
.define(
CPS_STREAMING_PULL_PARALLEL_STREAMS,
Type.INT,
1,
Importance.MEDIUM,
"The number of streams to open per-task when using streaming pull.")
.define(
CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS,
Type.LONG,
0,
Importance.MEDIUM,
"The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of `0` implies the java-pubsub library default value.")
.define(
CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION,
Type.LONG,
0,
Importance.MEDIUM,
"The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of `0` implies the java-pubsub library default value.")
.define(
KAFKA_MESSAGE_KEY_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
"The Cloud Pub/Sub message attribute to use as a key for messages published to Kafka. If set to \"orderingKey\", use the message's ordering key.")
.define(
KAFKA_MESSAGE_TIMESTAMP_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
"The optional Cloud Pub/Sub message attribute to use as a timestamp for messages "
+ "published to Kafka. The timestamp is Long value.")
.define(
KAFKA_PARTITIONS_CONFIG,
Type.INT,
DEFAULT_KAFKA_PARTITIONS,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The number of Kafka partitions for the Kafka topic in which messages will be "
+ "published to.")
.define(
KAFKA_PARTITION_SCHEME_CONFIG,
Type.STRING,
DEFAULT_KAFKA_PARTITION_SCHEME,
new PartitionScheme.Validator(),
Importance.MEDIUM,
"The scheme for assigning a message to a partition in Kafka.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"GCP JSON credentials")
.define(
USE_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.LOW,
"Use Kafka record headers to store Pub/Sub message attributes")
.define(
CPS_MAKE_ORDERING_KEY_ATTRIBUTE,
Type.BOOLEAN,
false,
Importance.LOW,
"When true, add the ordering key to the set of attributes with the key \"orderingKey\" "
+ "if it is non-empty.")
.define(ConnectorUtils.CPS_ENDPOINT,
Type.STRING,
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
Importance.LOW,
"The Pub/Sub endpoint to use.");
}