func()

in config.go [571:859]


func (c *Config) Validate() error {
	// some configuration values should be warned on but not fail completely, do those first
	if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
		Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
	}
	if !c.Net.SASL.Enable {
		if c.Net.SASL.User != "" {
			Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
		}
		if c.Net.SASL.Password != "" {
			Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
		}
	}
	if c.Producer.RequiredAcks > 1 {
		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
	}
	if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
		Logger.Println("Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.")
	}
	if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
		Logger.Println("Producer.Flush.Bytes must be smaller than MaxRequestSize; it will be ignored.")
	}
	if (c.Producer.Flush.Bytes > 0 || c.Producer.Flush.Messages > 0) && c.Producer.Flush.Frequency == 0 {
		Logger.Println("Producer.Flush: Bytes or Messages are set, but Frequency is not; messages may not get flushed.")
	}
	if c.Producer.Timeout%time.Millisecond != 0 {
		Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
	}
	if c.Consumer.MaxWaitTime < 100*time.Millisecond {
		Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
	}
	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
	}
	if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
		Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
	}
	if c.Consumer.Group.Session.Timeout%time.Millisecond != 0 {
		Logger.Println("Consumer.Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
	}
	if c.Consumer.Group.Heartbeat.Interval%time.Millisecond != 0 {
		Logger.Println("Consumer.Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
	}
	if c.Consumer.Group.Rebalance.Timeout%time.Millisecond != 0 {
		Logger.Println("Consumer.Group.Rebalance.Timeout only supports millisecond precision; nanoseconds will be truncated.")
	}
	if c.ClientID == defaultClientID {
		Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
	}

	// validate Net values
	switch {
	case c.Net.MaxOpenRequests <= 0:
		return ConfigurationError("Net.MaxOpenRequests must be > 0")
	case c.Net.DialTimeout <= 0:
		return ConfigurationError("Net.DialTimeout must be > 0")
	case c.Net.ReadTimeout <= 0:
		return ConfigurationError("Net.ReadTimeout must be > 0")
	case c.Net.WriteTimeout <= 0:
		return ConfigurationError("Net.WriteTimeout must be > 0")
	case c.Net.SASL.Enable:
		if c.Net.SASL.Mechanism == "" {
			c.Net.SASL.Mechanism = SASLTypePlaintext
		}

		switch c.Net.SASL.Mechanism {
		case SASLTypePlaintext:
			if c.Net.SASL.User == "" {
				return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
			}
			if c.Net.SASL.Password == "" {
				return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
			}
		case SASLTypeOAuth:
			if c.Net.SASL.TokenProvider == nil {
				return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider")
			}
		case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
			if c.Net.SASL.User == "" {
				return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
			}
			if c.Net.SASL.Password == "" {
				return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
			}
			if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
				return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
			}
		case SASLTypeGSSAPI:
			if c.Net.SASL.GSSAPI.ServiceName == "" {
				return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
			}

			switch c.Net.SASL.GSSAPI.AuthType {
			case KRB5_USER_AUTH:
				if c.Net.SASL.GSSAPI.Password == "" {
					return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
						"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
				}
			case KRB5_KEYTAB_AUTH:
				if c.Net.SASL.GSSAPI.KeyTabPath == "" {
					return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
						" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
				}
			case KRB5_CCACHE_AUTH:
				if c.Net.SASL.GSSAPI.CCachePath == "" {
					return ConfigurationError("Net.SASL.GSSAPI.CCachePath must not be empty when GSS-API mechanism is used" +
						" and Net.SASL.GSSAPI.AuthType = KRB5_CCACHE_AUTH")
				}
			default:
				return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH, KRB5_KEYTAB_AUTH, and KRB5_CCACHE_AUTH")
			}

			if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
				return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
			}
			if c.Net.SASL.GSSAPI.Username == "" {
				return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
			}
			if c.Net.SASL.GSSAPI.Realm == "" {
				return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
			}
		default:
			msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
				SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
			return ConfigurationError(msg)
		}
	}

	// validate the Admin values
	switch {
	case c.Admin.Timeout <= 0:
		return ConfigurationError("Admin.Timeout must be > 0")
	}

	// validate the Metadata values
	switch {
	case c.Metadata.Retry.Max < 0:
		return ConfigurationError("Metadata.Retry.Max must be >= 0")
	case c.Metadata.Retry.Backoff < 0:
		return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
	case c.Metadata.RefreshFrequency < 0:
		return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
	}

	// validate the Producer values
	switch {
	case c.Producer.MaxMessageBytes <= 0:
		return ConfigurationError("Producer.MaxMessageBytes must be > 0")
	case c.Producer.RequiredAcks < -1:
		return ConfigurationError("Producer.RequiredAcks must be >= -1")
	case c.Producer.Timeout <= 0:
		return ConfigurationError("Producer.Timeout must be > 0")
	case c.Producer.Partitioner == nil:
		return ConfigurationError("Producer.Partitioner must not be nil")
	case c.Producer.Flush.Bytes < 0:
		return ConfigurationError("Producer.Flush.Bytes must be >= 0")
	case c.Producer.Flush.Messages < 0:
		return ConfigurationError("Producer.Flush.Messages must be >= 0")
	case c.Producer.Flush.Frequency < 0:
		return ConfigurationError("Producer.Flush.Frequency must be >= 0")
	case c.Producer.Flush.MaxMessages < 0:
		return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
	case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
		return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
	case c.Producer.Retry.Max < 0:
		return ConfigurationError("Producer.Retry.Max must be >= 0")
	case c.Producer.Retry.Backoff < 0:
		return ConfigurationError("Producer.Retry.Backoff must be >= 0")
	}

	if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
		return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
	}

	if c.Producer.Compression == CompressionGZIP {
		if c.Producer.CompressionLevel != CompressionLevelDefault {
			if _, err := gzip.NewWriterLevel(io.Discard, c.Producer.CompressionLevel); err != nil {
				return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err))
			}
		}
	}

	if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
		return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
	}

	if c.Producer.Idempotent {
		if !c.Version.IsAtLeast(V0_11_0_0) {
			return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
		}
		if c.Producer.Retry.Max == 0 {
			return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
		}
		if c.Producer.RequiredAcks != WaitForAll {
			return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
		}
		if c.Net.MaxOpenRequests > 1 {
			return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
		}
	}

	if c.Producer.Transaction.ID != "" && !c.Producer.Idempotent {
		return ConfigurationError("Transactional producer requires Idempotent to be true")
	}

	// validate the Consumer values
	switch {
	case c.Consumer.Fetch.Min <= 0:
		return ConfigurationError("Consumer.Fetch.Min must be > 0")
	case c.Consumer.Fetch.Default <= 0:
		return ConfigurationError("Consumer.Fetch.Default must be > 0")
	case c.Consumer.Fetch.Max < 0:
		return ConfigurationError("Consumer.Fetch.Max must be >= 0")
	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
		return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
	case c.Consumer.MaxProcessingTime <= 0:
		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
	case c.Consumer.Retry.Backoff < 0:
		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
	case c.Consumer.Offsets.AutoCommit.Interval <= 0:
		return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
	case c.Consumer.Offsets.Retry.Max < 0:
		return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
	case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
		return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
	}

	if c.Consumer.Offsets.CommitInterval != 0 {
		Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
			" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
	}
	if c.Consumer.Group.Rebalance.Strategy != nil {
		Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
			" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
	}

	// validate IsolationLevel
	if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
		return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
	}

	// validate the Consumer Group values
	switch {
	case c.Consumer.Group.Session.Timeout <= 2*time.Millisecond:
		return ConfigurationError("Consumer.Group.Session.Timeout must be >= 2ms")
	case c.Consumer.Group.Heartbeat.Interval < 1*time.Millisecond:
		return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
	case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
		return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
	case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
		return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
	case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
		return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
	case c.Consumer.Group.Rebalance.Retry.Max < 0:
		return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
	case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
		return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
	}

	for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
		if strategy == nil {
			return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
		}
	}

	if c.Consumer.Group.InstanceId != "" {
		if !c.Version.IsAtLeast(V2_3_0_0) {
			return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
		}
		if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil {
			return err
		}
	}

	// validate misc shared values
	switch {
	case c.ChannelBufferSize < 0:
		return ConfigurationError("ChannelBufferSize must be >= 0")
	}

	// only validate clientID locally for Kafka versions before KIP-190 was implemented
	if !c.Version.IsAtLeast(V1_0_0_0) && !validClientID.MatchString(c.ClientID) {
		return ConfigurationError(fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", c.ClientID))
	}

	return nil
}