public static ConfigDef getDefinition()

in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [222:410]


  public static ConfigDef getDefinition() {
    return new ConfigDef()
        .define(
            GCP_PROJECT_ID_CONFIG,
            ConfigDef.Type.STRING,
            ConfigDef.NO_DEFAULT_VALUE,
            ConfigDef.CompositeValidator.of(
                new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
            ConfigDef.Importance.HIGH,
            "The ID of the GCP project.")
        .define(
            BIGTABLE_INSTANCE_ID_CONFIG,
            ConfigDef.Type.STRING,
            ConfigDef.NO_DEFAULT_VALUE,
            ConfigDef.CompositeValidator.of(
                new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
            ConfigDef.Importance.HIGH,
            "The ID of the Cloud Bigtable instance.")
        .define(
            BIGTABLE_APP_PROFILE_ID_CONFIG,
            ConfigDef.Type.STRING,
            null,
            ConfigDef.Importance.MEDIUM,
            "The application profile that the connector should use. If none is supplied,"
                + " the default app profile will be used.")
        .define(
            GCP_CREDENTIALS_PATH_CONFIG,
            ConfigDef.Type.STRING,
            null,
            ConfigDef.Importance.HIGH,
            "The path to the JSON service key file. Configure at most one of `"
                + GCP_CREDENTIALS_PATH_CONFIG
                + "` and `"
                + GCP_CREDENTIALS_JSON_CONFIG
                + "`. If neither is provided, Application Default Credentials will be used.")
        .define(
            GCP_CREDENTIALS_JSON_CONFIG,
            ConfigDef.Type.STRING,
            null,
            ConfigDef.Importance.HIGH,
            "The path to the JSON service key file. Configure at most one of `"
                + GCP_CREDENTIALS_PATH_CONFIG
                + "` and `"
                + GCP_CREDENTIALS_JSON_CONFIG
                + "`. If neither is provided, Application Default Credentials will be used.")
        .define(
            INSERT_MODE_CONFIG,
            ConfigDef.Type.STRING,
            InsertMode.INSERT.name(),
            enumValidator(InsertMode.values()),
            ConfigDef.Importance.HIGH,
            "Defines the insertion mode to use. Supported modes are:"
                + "\n- insert - Insert new record only."
                + " If the row to be written already exists in the table, an error is thrown."
                + "\n- upsert - If the row to be written already exists,"
                + " then its column values are overwritten with the ones provided.")
        .define(
            MAX_BATCH_SIZE_CONFIG,
            ConfigDef.Type.INT,
            1,
            ConfigDef.Range.atLeast(1),
            ConfigDef.Importance.MEDIUM,
            "The maximum number of records that can be batched into a batch of upserts."
                + " Note that since only a batch size of 1 for inserts is supported, `"
                + MAX_BATCH_SIZE_CONFIG
                + "` must be exactly `1` when `"
                + INSERT_MODE_CONFIG
                + "` is set to `INSERT`.")
        .define(
            VALUE_NULL_MODE_CONFIG,
            ConfigDef.Type.STRING,
            NullValueMode.WRITE.name(),
            enumValidator(NullValueMode.values()),
            ConfigDef.Importance.MEDIUM,
            "Defines what to do with `null`s within Kafka values. Supported modes are:"
                + "\n- write - Serialize `null`s to empty byte arrays."
                + "\n- ignore - Ignore `null`s."
                + "\n- delete - Use them to issue DELETE commands. Root-level `null` deletes a"
                + " row. `null` nested one level deletes a column family named after the"
                + " `null`-valued field. `null` nested two levels deletes a column named after the"
                + " `null`-valued field in column family named after the `null-valued` field parent"
                + " field. `null` values nested more than two levels are serialized like other"
                + " values and don't result in any DELETE commands.")
        .define(
            ERROR_MODE_CONFIG,
            ConfigDef.Type.STRING,
            BigtableErrorMode.FAIL.name(),
            enumValidator(BigtableErrorMode.values()),
            ConfigDef.Importance.MEDIUM,
            "Specifies how to handle errors that result from writes, after retries. It is ignored"
                + " if DLQ is configured. Supported modes are:"
                + "\n- fail - The connector fails and must be manually restarted."
                + "\n- warn - The connector logs a warning and continues operating normally."
                + "\n- ignore - The connector does not log a warning but continues operating"
                + " normally.")
        .define(
            TABLE_NAME_FORMAT_CONFIG,
            ConfigDef.Type.STRING,
            ConfigInterpolation.TOPIC_PLACEHOLDER,
            ConfigDef.CompositeValidator.of(
                new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
            ConfigDef.Importance.MEDIUM,
            "Name of the destination table. Use `"
                + ConfigInterpolation.TOPIC_PLACEHOLDER
                + "` within the table name to specify the originating topic name.\n"
                + "For example, `user_"
                + ConfigInterpolation.TOPIC_PLACEHOLDER
                + "` for the topic `stats` will map to the table name `user_stats`.")
        .define(
            ROW_KEY_DEFINITION_CONFIG,
            ConfigDef.Type.LIST,
            "",
            ConfigDef.Importance.MEDIUM,
            "A comma separated list of Kafka Record key field names that specifies the order of"
                + " Kafka key fields to be concatenated to form the row key."
                + "\nFor example the list: `username, post_id, time_stamp` when applied to a Kafka"
                + " key: `{'username': 'bob','post_id': '213', 'time_stamp': '123123'}` and with"
                + " delimiter `#` gives the row key `bob#213#123123`. You can also access terms"
                + " nested in the key by using `.` as a delimiter. If this configuration is empty"
                + " or unspecified and the Kafka Message Key is a"
                + "\n- struct, all the fields in the struct are used to construct the row key."
                + "\n- byte array, the row key is set to the byte array as is."
                + "\n- primitive, the row key is set to the primitive stringified."
                + "If prefixes, more complicated delimiters, and string constants are required in"
                + " your Row Key, consider configuring an SMT to add relevant fields to the Kafka"
                + " Record key.")
        .define(
            ROW_KEY_DELIMITER_CONFIG,
            ConfigDef.Type.STRING,
            "",
            ConfigDef.Importance.LOW,
            "The delimiter used in concatenating Kafka key fields in the row key. If this"
                + " configuration is empty or unspecified, the key fields will be concatenated"
                + " together directly.")
        .define(
            AUTO_CREATE_TABLES_CONFIG,
            ConfigDef.Type.BOOLEAN,
            false,
            new ConfigDef.NonNullValidator(),
            ConfigDef.Importance.MEDIUM,
            "Whether to automatically create the destination table if it is found to be missing.\n"
                + "When enabled, the records for which the auto-creation fails, are failed.\n"
                + "Recreation of tables deleted by other Cloud Bigtable users is not supported.\n"
                + "Note that table auto-creation is slow (multiple seconds). It may slow down not"
                + " only the records targeting nonexistent tables, but also other records batched"
                + " with them. To facilitate predictable latency leave this option disabled.")
        .define(
            AUTO_CREATE_COLUMN_FAMILIES_CONFIG,
            ConfigDef.Type.BOOLEAN,
            false,
            new ConfigDef.NonNullValidator(),
            ConfigDef.Importance.MEDIUM,
            "Whether to automatically create missing columns families in the table relative to the"
                + " record schema.\n"
                + "Does not imply auto-creation of tables.\n"
                + "When enabled, the records for which the auto-creation fails, are failed.\n"
                + "When enabled, column families will be created also for deletions of nonexistent"
                + " column families and cells within them.\n"
                + "Recreation of column families deleted by other Cloud Bigtable users is not"
                + " supported.\n"
                + "Note that column family auto-creation is slow. It may slow down"
                + " not only the records targeting nonexistent column families, but also other"
                + " records batched with them. To facilitate predictable latency leave this option"
                + " disabled.")
        .define(
            DEFAULT_COLUMN_FAMILY_CONFIG,
            ConfigDef.Type.STRING,
            ConfigInterpolation.TOPIC_PLACEHOLDER,
            ConfigDef.Importance.MEDIUM,
            "Any root-level fields on the SinkRecord that aren't objects will be added to this"
                + " column family. If empty, the fields will be ignored. Use `"
                + ConfigInterpolation.TOPIC_PLACEHOLDER
                + "` within the column family name to specify the originating topic name.")
        .define(
            DEFAULT_COLUMN_QUALIFIER_CONFIG,
            ConfigDef.Type.STRING,
            "KAFKA_VALUE",
            ConfigDef.Importance.MEDIUM,
            "Any root-level values on the SinkRecord that aren't objects will be added to this"
                + " column within default column family. If empty, the value will be ignored.")
        .define(
            RETRY_TIMEOUT_MILLIS_CONFIG,
            ConfigDef.Type.LONG,
            90000,
            ConfigDef.Range.atLeast(0),
            ConfigDef.Importance.MEDIUM,
            "Maximum time in milliseconds allocated for retrying database operations before trying"
                + " other error handling mechanisms.");
  }