in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [222:410]
public static ConfigDef getDefinition() {
return new ConfigDef()
.define(
GCP_PROJECT_ID_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.CompositeValidator.of(
new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
ConfigDef.Importance.HIGH,
"The ID of the GCP project.")
.define(
BIGTABLE_INSTANCE_ID_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.CompositeValidator.of(
new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
ConfigDef.Importance.HIGH,
"The ID of the Cloud Bigtable instance.")
.define(
BIGTABLE_APP_PROFILE_ID_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"The application profile that the connector should use. If none is supplied,"
+ " the default app profile will be used.")
.define(
GCP_CREDENTIALS_PATH_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH,
"The path to the JSON service key file. Configure at most one of `"
+ GCP_CREDENTIALS_PATH_CONFIG
+ "` and `"
+ GCP_CREDENTIALS_JSON_CONFIG
+ "`. If neither is provided, Application Default Credentials will be used.")
.define(
GCP_CREDENTIALS_JSON_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH,
"The path to the JSON service key file. Configure at most one of `"
+ GCP_CREDENTIALS_PATH_CONFIG
+ "` and `"
+ GCP_CREDENTIALS_JSON_CONFIG
+ "`. If neither is provided, Application Default Credentials will be used.")
.define(
INSERT_MODE_CONFIG,
ConfigDef.Type.STRING,
InsertMode.INSERT.name(),
enumValidator(InsertMode.values()),
ConfigDef.Importance.HIGH,
"Defines the insertion mode to use. Supported modes are:"
+ "\n- insert - Insert new record only."
+ " If the row to be written already exists in the table, an error is thrown."
+ "\n- upsert - If the row to be written already exists,"
+ " then its column values are overwritten with the ones provided.")
.define(
MAX_BATCH_SIZE_CONFIG,
ConfigDef.Type.INT,
1,
ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM,
"The maximum number of records that can be batched into a batch of upserts."
+ " Note that since only a batch size of 1 for inserts is supported, `"
+ MAX_BATCH_SIZE_CONFIG
+ "` must be exactly `1` when `"
+ INSERT_MODE_CONFIG
+ "` is set to `INSERT`.")
.define(
VALUE_NULL_MODE_CONFIG,
ConfigDef.Type.STRING,
NullValueMode.WRITE.name(),
enumValidator(NullValueMode.values()),
ConfigDef.Importance.MEDIUM,
"Defines what to do with `null`s within Kafka values. Supported modes are:"
+ "\n- write - Serialize `null`s to empty byte arrays."
+ "\n- ignore - Ignore `null`s."
+ "\n- delete - Use them to issue DELETE commands. Root-level `null` deletes a"
+ " row. `null` nested one level deletes a column family named after the"
+ " `null`-valued field. `null` nested two levels deletes a column named after the"
+ " `null`-valued field in column family named after the `null-valued` field parent"
+ " field. `null` values nested more than two levels are serialized like other"
+ " values and don't result in any DELETE commands.")
.define(
ERROR_MODE_CONFIG,
ConfigDef.Type.STRING,
BigtableErrorMode.FAIL.name(),
enumValidator(BigtableErrorMode.values()),
ConfigDef.Importance.MEDIUM,
"Specifies how to handle errors that result from writes, after retries. It is ignored"
+ " if DLQ is configured. Supported modes are:"
+ "\n- fail - The connector fails and must be manually restarted."
+ "\n- warn - The connector logs a warning and continues operating normally."
+ "\n- ignore - The connector does not log a warning but continues operating"
+ " normally.")
.define(
TABLE_NAME_FORMAT_CONFIG,
ConfigDef.Type.STRING,
ConfigInterpolation.TOPIC_PLACEHOLDER,
ConfigDef.CompositeValidator.of(
new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
ConfigDef.Importance.MEDIUM,
"Name of the destination table. Use `"
+ ConfigInterpolation.TOPIC_PLACEHOLDER
+ "` within the table name to specify the originating topic name.\n"
+ "For example, `user_"
+ ConfigInterpolation.TOPIC_PLACEHOLDER
+ "` for the topic `stats` will map to the table name `user_stats`.")
.define(
ROW_KEY_DEFINITION_CONFIG,
ConfigDef.Type.LIST,
"",
ConfigDef.Importance.MEDIUM,
"A comma separated list of Kafka Record key field names that specifies the order of"
+ " Kafka key fields to be concatenated to form the row key."
+ "\nFor example the list: `username, post_id, time_stamp` when applied to a Kafka"
+ " key: `{'username': 'bob','post_id': '213', 'time_stamp': '123123'}` and with"
+ " delimiter `#` gives the row key `bob#213#123123`. You can also access terms"
+ " nested in the key by using `.` as a delimiter. If this configuration is empty"
+ " or unspecified and the Kafka Message Key is a"
+ "\n- struct, all the fields in the struct are used to construct the row key."
+ "\n- byte array, the row key is set to the byte array as is."
+ "\n- primitive, the row key is set to the primitive stringified."
+ "If prefixes, more complicated delimiters, and string constants are required in"
+ " your Row Key, consider configuring an SMT to add relevant fields to the Kafka"
+ " Record key.")
.define(
ROW_KEY_DELIMITER_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
"The delimiter used in concatenating Kafka key fields in the row key. If this"
+ " configuration is empty or unspecified, the key fields will be concatenated"
+ " together directly.")
.define(
AUTO_CREATE_TABLES_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
new ConfigDef.NonNullValidator(),
ConfigDef.Importance.MEDIUM,
"Whether to automatically create the destination table if it is found to be missing.\n"
+ "When enabled, the records for which the auto-creation fails, are failed.\n"
+ "Recreation of tables deleted by other Cloud Bigtable users is not supported.\n"
+ "Note that table auto-creation is slow (multiple seconds). It may slow down not"
+ " only the records targeting nonexistent tables, but also other records batched"
+ " with them. To facilitate predictable latency leave this option disabled.")
.define(
AUTO_CREATE_COLUMN_FAMILIES_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
new ConfigDef.NonNullValidator(),
ConfigDef.Importance.MEDIUM,
"Whether to automatically create missing columns families in the table relative to the"
+ " record schema.\n"
+ "Does not imply auto-creation of tables.\n"
+ "When enabled, the records for which the auto-creation fails, are failed.\n"
+ "When enabled, column families will be created also for deletions of nonexistent"
+ " column families and cells within them.\n"
+ "Recreation of column families deleted by other Cloud Bigtable users is not"
+ " supported.\n"
+ "Note that column family auto-creation is slow. It may slow down"
+ " not only the records targeting nonexistent column families, but also other"
+ " records batched with them. To facilitate predictable latency leave this option"
+ " disabled.")
.define(
DEFAULT_COLUMN_FAMILY_CONFIG,
ConfigDef.Type.STRING,
ConfigInterpolation.TOPIC_PLACEHOLDER,
ConfigDef.Importance.MEDIUM,
"Any root-level fields on the SinkRecord that aren't objects will be added to this"
+ " column family. If empty, the fields will be ignored. Use `"
+ ConfigInterpolation.TOPIC_PLACEHOLDER
+ "` within the column family name to specify the originating topic name.")
.define(
DEFAULT_COLUMN_QUALIFIER_CONFIG,
ConfigDef.Type.STRING,
"KAFKA_VALUE",
ConfigDef.Importance.MEDIUM,
"Any root-level values on the SinkRecord that aren't objects will be added to this"
+ " column within default column family. If empty, the value will be ignored.")
.define(
RETRY_TIMEOUT_MILLIS_CONFIG,
ConfigDef.Type.LONG,
90000,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM,
"Maximum time in milliseconds allocated for retrying database operations before trying"
+ " other error handling mechanisms.");
}