in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java [142:254]
public ConfigDef config() {
return new ConfigDef()
.define(
ConnectorUtils.CPS_PROJECT_CONFIG,
Type.STRING,
Importance.HIGH,
"The project containing the topic to which to publish.")
.define(
ConnectorUtils.CPS_TOPIC_CONFIG,
Type.STRING,
Importance.HIGH,
"The topic to which to publish.")
.define(
MAX_BUFFER_SIZE_CONFIG,
Type.INT,
DEFAULT_MAX_BUFFER_SIZE,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum number of messages that can be received for the messages on a topic "
+ "partition before publishing them to Cloud Pub/Sub.")
.define(
MAX_BUFFER_BYTES_CONFIG,
Type.LONG,
DEFAULT_MAX_BUFFER_BYTES,
ConfigDef.Range.between(1, DEFAULT_MAX_BUFFER_BYTES),
Importance.MEDIUM,
"The maximum number of bytes that can be received for the messages on a topic "
+ "partition before publishing the messages to Cloud Pub/Sub.")
.define(MAX_OUTSTANDING_REQUEST_BYTES,
Type.LONG,
DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES,
Importance.MEDIUM,
"The maximum outstanding bytes from incomplete requests before the task blocks."
)
.define(MAX_OUTSTANDING_MESSAGES,
Type.LONG,
DEFAULT_MAX_OUTSTANDING_MESSAGES,
Importance.MEDIUM,
"The maximum outstanding incomplete messages before the task blocks."
)
.define(
MAX_DELAY_THRESHOLD_MS,
Type.INT,
DEFAULT_DELAY_THRESHOLD_MS,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait after receiving the first message in a batch for a "
+ "before publishing the messages to Cloud Pub/Sub.")
.define(
MAX_REQUEST_TIMEOUT_MS,
Type.INT,
DEFAULT_REQUEST_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a single publish request to Cloud Pub/Sub.")
.define(
MAX_TOTAL_TIMEOUT_MS,
Type.INT,
DEFAULT_TOTAL_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a publish to complete (including retries) in "
+ "Cloud Pub/Sub.")
.define(
MAX_SHUTDOWN_TIMEOUT_MS,
Type.INT,
DEFAULT_SHUTDOWN_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a publisher to shutdown when stopping task "
+ "in Kafka Connect.")
.define(
PUBLISH_KAFKA_METADATA,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"When true, include the Kafka topic, partition, offset, and timestamp as message "
+ "attributes when a message is published to Cloud Pub/Sub.")
.define(
PUBLISH_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
.define(CPS_MESSAGE_BODY_NAME,
Type.STRING,
DEFAULT_MESSAGE_BODY_NAME,
Importance.MEDIUM,
"When using a struct or map value schema, this field or key name indicates that the "
+ "corresponding value will go into the Pub/Sub message body.")
.define(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"The path to the GCP credentials file")
.define(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"GCP JSON credentials")
.define(ORDERING_KEY_SOURCE,
Type.STRING,
DEFAULT_ORDERING_KEY_SOURCE,
new OrderingKeySource.Validator(),
Importance.MEDIUM,
"What to use to populate the Pub/Sub message ordering key. Possible values are "
+ "\"none\", \"key\", or \"partition\".")
.define(ConnectorUtils.CPS_ENDPOINT,
Type.STRING,
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
Importance.LOW,
"The Pub/Sub endpoint to use.");
}