public ConfigDef config()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java [178:296]


  public ConfigDef config() {
    return new ConfigDef()
        .define(
            KAFKA_TOPIC_CONFIG,
            Type.STRING,
            Importance.HIGH,
            "The topic in Kafka which will receive messages that were pulled from Cloud Pub/Sub.")
        .define(
            ConnectorUtils.CPS_PROJECT_CONFIG,
            Type.STRING,
            Importance.HIGH,
            "The project containing the topic from which to pull messages.")
        .define(
            CPS_SUBSCRIPTION_CONFIG,
            Type.STRING,
            Importance.HIGH,
            "The name of the subscription to Cloud Pub/Sub.")
        .define(
            CPS_MAX_BATCH_SIZE_CONFIG,
            Type.INT,
            DEFAULT_CPS_MAX_BATCH_SIZE,
            ConfigDef.Range.between(1, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The maximum number of messages to batch per pull request to Cloud Pub/Sub.")
        .define(
            CPS_STREAMING_PULL_ENABLED,
            Type.BOOLEAN,
            false,
            Importance.MEDIUM,
            "Whether to use streaming pull for the connector to connect to Cloud Pub/Sub. If provided, cps.maxBatchSize is ignored.")
        .define(
            CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES,
            Type.LONG,
            1000L,
            Importance.MEDIUM,
            "The maximum number of outstanding messages per task when using streaming pull.")
        .define(
            CPS_STREAMING_PULL_FLOW_CONTROL_BYTES,
            Type.LONG,
            100L * 1024 * 1024,
            Importance.MEDIUM,
            "The maximum number of outstanding message bytes per task when using streaming pull.")
        .define(
            CPS_STREAMING_PULL_PARALLEL_STREAMS,
            Type.INT,
            1,
            Importance.MEDIUM,
            "The number of streams to open per-task when using streaming pull.")
        .define(
            CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS,
            Type.LONG,
            0,
            Importance.MEDIUM,
            "The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of `0` implies the java-pubsub library default value.")
        .define(
            CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION,
            Type.LONG,
            0,
            Importance.MEDIUM,
            "The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of `0` implies the java-pubsub library default value.")
        .define(
            KAFKA_MESSAGE_KEY_CONFIG,
            Type.STRING,
            null,
            Importance.MEDIUM,
            "The Cloud Pub/Sub message attribute to use as a key for messages published to Kafka. If set to \"orderingKey\", use the message's ordering key.")
        .define(
            KAFKA_MESSAGE_TIMESTAMP_CONFIG,
            Type.STRING,
            null,
            Importance.MEDIUM,
            "The optional Cloud Pub/Sub message attribute to use as a timestamp for messages "
                + "published to Kafka. The timestamp is Long value.")
        .define(
            KAFKA_PARTITIONS_CONFIG,
            Type.INT,
            DEFAULT_KAFKA_PARTITIONS,
            ConfigDef.Range.between(1, Integer.MAX_VALUE),
            Importance.MEDIUM,
            "The number of Kafka partitions for the Kafka topic in which messages will be "
                + "published to.")
        .define(
            KAFKA_PARTITION_SCHEME_CONFIG,
            Type.STRING,
            DEFAULT_KAFKA_PARTITION_SCHEME,
            new PartitionScheme.Validator(),
            Importance.MEDIUM,
            "The scheme for assigning a message to a partition in Kafka.")
        .define(
            ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
            Type.STRING,
            null,
            Importance.HIGH,
            "The path to the GCP credentials file")
        .define(
            ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
            Type.STRING,
            null,
            Importance.HIGH,
            "GCP JSON credentials")
        .define(
            USE_KAFKA_HEADERS,
            Type.BOOLEAN,
            false,
            Importance.LOW,
            "Use Kafka record headers to store Pub/Sub message attributes")
        .define(
            CPS_MAKE_ORDERING_KEY_ATTRIBUTE,
            Type.BOOLEAN,
            false,
            Importance.LOW,
            "When true, add the ordering key to the set of attributes with the key \"orderingKey\" "
                + "if it is non-empty.")
        .define(ConnectorUtils.CPS_ENDPOINT,
            Type.STRING,
            ConnectorUtils.CPS_DEFAULT_ENDPOINT,
            Importance.LOW,
            "The Pub/Sub endpoint to use.");
  }