flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java [52:218]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public final class InfluxDBSinkBuilder<IN> {
    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
    private String influxDBUrl;
    private String influxDBUsername;
    private String influxDBPassword;
    private String influxDBToken;
    private String bucketName;
    private String organizationName;
    private final Configuration configuration;

    InfluxDBSinkBuilder() {
        this.influxDBUrl = null;
        this.influxDBUsername = null;
        this.influxDBPassword = null;
        this.influxDBToken = null;
        this.bucketName = null;
        this.organizationName = null;
        this.influxDBSchemaSerializer = null;
        this.configuration = new Configuration();
    }

    /**
     * Sets the InfluxDB url.
     *
     * @param influxDBUrl the url of the InfluxDB instance to send data to.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
        this.influxDBUrl = influxDBUrl;
        this.configuration.setString(INFLUXDB_URL, checkNotNull(influxDBUrl));
        return this;
    }

    /**
     * Sets the InfluxDB user name.
     *
     * @param influxDBUsername the user name of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUsername) {
        this.influxDBUsername = influxDBUsername;
        this.configuration.setString(INFLUXDB_USERNAME, checkNotNull(influxDBUsername));
        return this;
    }

    /**
     * Sets the InfluxDB password.
     *
     * @param influxDBPassword the password of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBPassword) {
        this.influxDBPassword = influxDBPassword;
        this.configuration.setString(INFLUXDB_PASSWORD, checkNotNull(influxDBPassword));
        return this;
    }

    /**
     * Sets the InfluxDB token.
     *
     * @param influxDBToken the token of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) {
        this.influxDBToken = influxDBToken;
        this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken));
        return this;
    }

    /**
     * Sets the InfluxDB bucket name.
     *
     * @param bucketName the bucket name of the InfluxDB instance to store the data in.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String bucketName) {
        this.bucketName = bucketName;
        this.configuration.setString(INFLUXDB_BUCKET, checkNotNull(bucketName));
        return this;
    }

    /**
     * Sets the InfluxDB organization name.
     *
     * @param organizationName the organization name of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String organizationName) {
        this.organizationName = organizationName;
        this.configuration.setString(INFLUXDB_ORGANIZATION, checkNotNull(organizationName));
        return this;
    }

    /**
     * Sets the {@link InfluxDBSchemaSerializer serializer} of the input type IN for the
     * InfluxDBSink.
     *
     * @param influxDBSchemaSerializer the serializer for the input type.
     * @return this InfluxDBSourceBuilder.
     */
    public <T extends IN> InfluxDBSinkBuilder<T> setInfluxDBSchemaSerializer(
            final InfluxDBSchemaSerializer<T> influxDBSchemaSerializer) {
        checkNotNull(influxDBSchemaSerializer);
        final InfluxDBSinkBuilder<T> sinkBuilder = (InfluxDBSinkBuilder<T>) this;
        sinkBuilder.influxDBSchemaSerializer = influxDBSchemaSerializer;
        return sinkBuilder;
    }

    /**
     * Sets if the InfluxDBSink should write checkpoint data points to InfluxDB.
     *
     * @param shouldWrite boolean if checkpoint should be written.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> addCheckpointDataPoint(final boolean shouldWrite) {
        this.configuration.setBoolean(WRITE_DATA_POINT_CHECKPOINT, shouldWrite);
        return this;
    }

    /**
     * Sets the buffer size of the {@link InfluxDBWriter}. This also determines the number of {@link
     * com.influxdb.client.write.Point} send to the InfluxDB instance per request.
     *
     * @param bufferSize size of the buffer.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
        if (bufferSize <= 0) {
            throw new IllegalArgumentException("The buffer size should be greater than 0.");
        }
        this.configuration.setInteger(WRITE_BUFFER_SIZE, bufferSize);
        return this;
    }

    /**
     * Build the {@link InfluxDBSink}.
     *
     * @return a InfluxDBSink with the settings made for this builder.
     */
    public InfluxDBSink<IN> build() {
        this.sanityCheck();
        return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.configuration);
    }

    // ------------- private helpers  --------------

    /** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
    private void sanityCheck() {
        // Check required settings.
        checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
        // check that either username/password or token is provided for authentication
        checkArgument(
                this.influxDBToken != null
                        || (this.influxDBUsername != null && this.influxDBPassword != null),
                "Either the InfluxDB username and password or InfluxDB token are required but neither provided"
        );
        // check that both username/password and token are not both provided for authentication
        checkArgument(
                ! (this.influxDBToken != null
                        && (this.influxDBUsername != null || this.influxDBPassword != null)),
                "Either the InfluxDB username and password or InfluxDB token are required but both provided"
        );
        checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
        checkNotNull(this.organizationName, "The Organization name is required but not provided.");
        checkNotNull(
                this.influxDBSchemaSerializer,
                "Serialization schema is required but not provided.");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java [52:218]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public final class InfluxDBSinkBuilder<IN> {
    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
    private String influxDBUrl;
    private String influxDBUsername;
    private String influxDBPassword;
    private String influxDBToken;
    private String bucketName;
    private String organizationName;
    private final Configuration configuration;

    InfluxDBSinkBuilder() {
        this.influxDBUrl = null;
        this.influxDBUsername = null;
        this.influxDBPassword = null;
        this.influxDBToken = null;
        this.bucketName = null;
        this.organizationName = null;
        this.influxDBSchemaSerializer = null;
        this.configuration = new Configuration();
    }

    /**
     * Sets the InfluxDB url.
     *
     * @param influxDBUrl the url of the InfluxDB instance to send data to.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
        this.influxDBUrl = influxDBUrl;
        this.configuration.setString(INFLUXDB_URL, checkNotNull(influxDBUrl));
        return this;
    }

    /**
     * Sets the InfluxDB user name.
     *
     * @param influxDBUsername the user name of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUsername) {
        this.influxDBUsername = influxDBUsername;
        this.configuration.setString(INFLUXDB_USERNAME, checkNotNull(influxDBUsername));
        return this;
    }

    /**
     * Sets the InfluxDB password.
     *
     * @param influxDBPassword the password of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBPassword) {
        this.influxDBPassword = influxDBPassword;
        this.configuration.setString(INFLUXDB_PASSWORD, checkNotNull(influxDBPassword));
        return this;
    }

    /**
     * Sets the InfluxDB token.
     *
     * @param influxDBToken the token of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) {
        this.influxDBToken = influxDBToken;
        this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken));
        return this;
    }

    /**
     * Sets the InfluxDB bucket name.
     *
     * @param bucketName the bucket name of the InfluxDB instance to store the data in.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String bucketName) {
        this.bucketName = bucketName;
        this.configuration.setString(INFLUXDB_BUCKET, checkNotNull(bucketName));
        return this;
    }

    /**
     * Sets the InfluxDB organization name.
     *
     * @param organizationName the organization name of the InfluxDB instance.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String organizationName) {
        this.organizationName = organizationName;
        this.configuration.setString(INFLUXDB_ORGANIZATION, checkNotNull(organizationName));
        return this;
    }

    /**
     * Sets the {@link InfluxDBSchemaSerializer serializer} of the input type IN for the
     * InfluxDBSink.
     *
     * @param influxDBSchemaSerializer the serializer for the input type.
     * @return this InfluxDBSourceBuilder.
     */
    public <T extends IN> InfluxDBSinkBuilder<T> setInfluxDBSchemaSerializer(
            final InfluxDBSchemaSerializer<T> influxDBSchemaSerializer) {
        checkNotNull(influxDBSchemaSerializer);
        final InfluxDBSinkBuilder<T> sinkBuilder = (InfluxDBSinkBuilder<T>) this;
        sinkBuilder.influxDBSchemaSerializer = influxDBSchemaSerializer;
        return sinkBuilder;
    }

    /**
     * Sets if the InfluxDBSink should write checkpoint data points to InfluxDB.
     *
     * @param shouldWrite boolean if checkpoint should be written.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> addCheckpointDataPoint(final boolean shouldWrite) {
        this.configuration.setBoolean(WRITE_DATA_POINT_CHECKPOINT, shouldWrite);
        return this;
    }

    /**
     * Sets the buffer size of the {@link InfluxDBWriter}. This also determines the number of {@link
     * com.influxdb.client.write.Point} send to the InfluxDB instance per request.
     *
     * @param bufferSize size of the buffer.
     * @return this InfluxDBSinkBuilder.
     */
    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
        if (bufferSize <= 0) {
            throw new IllegalArgumentException("The buffer size should be greater than 0.");
        }
        this.configuration.setInteger(WRITE_BUFFER_SIZE, bufferSize);
        return this;
    }

    /**
     * Build the {@link org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink}.
     *
     * @return a InfluxDBSink with the settings made for this builder.
     */
    public InfluxDBSink<IN> build() {
        this.sanityCheck();
        return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.configuration);
    }

    // ------------- private helpers  --------------

    /** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
    private void sanityCheck() {
        // Check required settings.
        checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
        // check that either username/password or token is provided for authentication
        checkArgument(
                this.influxDBToken != null
                        || (this.influxDBUsername != null && this.influxDBPassword != null),
                "Either the InfluxDB username and password or InfluxDB token are required but neither provided"
        );
        // check that both username/password and token are not both provided for authentication
        checkArgument(
                ! (this.influxDBToken != null
                        && (this.influxDBUsername != null || this.influxDBPassword != null)),
                "Either the InfluxDB username and password or InfluxDB token are required but both provided"
        );
        checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
        checkNotNull(this.organizationName, "The Organization name is required but not provided.");
        checkNotNull(
                this.influxDBSchemaSerializer,
                "Serialization schema is required but not provided.");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



