public ConfigDef config()

in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java [142:254]


  public ConfigDef config() {
    return new ConfigDef()
        .define(
            ConnectorUtils.CPS_PROJECT_CONFIG,
            Type.STRING,
            Importance.HIGH,
            "The project containing the topic to which to publish.")
        .define(
            ConnectorUtils.CPS_TOPIC_CONFIG,
            Type.STRING,
            Importance.HIGH,
            "The topic to which to publish.")
        .define(
            MAX_BUFFER_SIZE_CONFIG,
            Type.INT,
            DEFAULT_MAX_BUFFER_SIZE,
            ConfigDef.Range.between(1, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum number of messages that can be received for the messages on a topic "
                + "partition before publishing them to Cloud Pub/Sub.")
        .define(
            MAX_BUFFER_BYTES_CONFIG,
            Type.LONG,
            DEFAULT_MAX_BUFFER_BYTES,
            ConfigDef.Range.between(1, DEFAULT_MAX_BUFFER_BYTES),
            Importance.MEDIUM,
            "The maximum number of bytes that can be received for the messages on a topic "
                + "partition before publishing the messages to Cloud Pub/Sub.")
        .define(MAX_OUTSTANDING_REQUEST_BYTES,
            Type.LONG,
            DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES,
            Importance.MEDIUM,
            "The maximum outstanding bytes from incomplete requests before the task blocks."
        )
        .define(MAX_OUTSTANDING_MESSAGES,
            Type.LONG,
            DEFAULT_MAX_OUTSTANDING_MESSAGES,
            Importance.MEDIUM,
            "The maximum outstanding incomplete messages before the task blocks."
        )
        .define(
            MAX_DELAY_THRESHOLD_MS,
            Type.INT,
            DEFAULT_DELAY_THRESHOLD_MS,
            ConfigDef.Range.between(1, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum amount of time to wait after receiving the first message in a batch for a "
                + "before publishing the messages to Cloud Pub/Sub.")
        .define(
            MAX_REQUEST_TIMEOUT_MS,
            Type.INT,
            DEFAULT_REQUEST_TIMEOUT_MS,
            ConfigDef.Range.between(10000, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum amount of time to wait for a single publish request to Cloud Pub/Sub.")
        .define(
            MAX_TOTAL_TIMEOUT_MS,
            Type.INT,
            DEFAULT_TOTAL_TIMEOUT_MS,
            ConfigDef.Range.between(10000, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum amount of time to wait for a publish to complete (including retries) in "
                + "Cloud Pub/Sub.")
        .define(
            MAX_SHUTDOWN_TIMEOUT_MS,
            Type.INT,
            DEFAULT_SHUTDOWN_TIMEOUT_MS,
            ConfigDef.Range.between(10000, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum amount of time to wait for a publisher to shutdown when stopping task "
                + "in Kafka Connect.")
        .define(
            PUBLISH_KAFKA_METADATA,
            Type.BOOLEAN,
            false,
            Importance.MEDIUM,
            "When true, include the Kafka topic, partition, offset, and timestamp as message "
                + "attributes when a message is published to Cloud Pub/Sub.")
        .define(
           PUBLISH_KAFKA_HEADERS,
           Type.BOOLEAN,
           false,
           Importance.MEDIUM,
           "When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
        .define(CPS_MESSAGE_BODY_NAME,
            Type.STRING,
            DEFAULT_MESSAGE_BODY_NAME,
            Importance.MEDIUM,
            "When using a struct or map value schema, this field or key name indicates that the "
                + "corresponding value will go into the Pub/Sub message body.")
        .define(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
            Type.STRING,
            null,
            Importance.HIGH,
            "The path to the GCP credentials file")
        .define(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
            Type.STRING,
            null,
            Importance.HIGH,
            "GCP JSON credentials")
        .define(ORDERING_KEY_SOURCE,
            Type.STRING,
            DEFAULT_ORDERING_KEY_SOURCE,
            new OrderingKeySource.Validator(),
            Importance.MEDIUM,
            "What to use to populate the Pub/Sub message ordering key. Possible values are "
                + "\"none\", \"key\", or \"partition\".")
        .define(ConnectorUtils.CPS_ENDPOINT,
            Type.STRING,
            ConnectorUtils.CPS_DEFAULT_ENDPOINT,
            Importance.LOW,
            "The Pub/Sub endpoint to use.");
  }