config/config.go (57 lines of code) (raw):

// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package config import ( "fmt" "time" "os" "github.com/elastic/beats/v7/libbeat/common" ) type Config struct { Project string `config:"project_id" validate:"required"` Topic string `config:"topic" validate:"required"` CredentialsFile string `config:"credentials_file"` Subscription struct { Name string `config:"name" validate:"required"` RetainAckedMessages bool `config:"retain_acked_messages"` RetentionDuration time.Duration `config:"retention_duration"` Create bool `config:"create"` // Settings for the Pub/Sub receiver ConnectionPoolSize int `config:"connection_pool_size"` } Json struct { Enabled bool `config:"enabled"` AddErrorKey bool `config:"add_error_key"` UseNumber bool `config:"use_number"` FieldsUnderRoot bool `config:"fields_under_root"` FieldsUseTimestamp bool `config:"fields_use_timestamp"` FieldsTimestampName string `config:"fields_timestamp_name"` FieldsTimestampFormat string `config:"fields_timestamp_format"` } } func GetDefaultConfig() Config { config := Config{} config.Subscription.ConnectionPoolSize = 1 config.Subscription.Create = true config.Json.FieldsTimestampName = "@timestamp" return config } var DefaultConfig = GetDefaultConfig() func GetAndValidateConfig(cfg *common.Config) (*Config, error) { c := DefaultConfig if err := cfg.Unpack(&c); err != nil { return nil, fmt.Errorf("error in config file: %v", err) } if d, _ := time.ParseDuration("10m"); c.Subscription.RetentionDuration < d { return nil, fmt.Errorf("retention_duration cannot be shorter than 10 minutes") } if d, _ := time.ParseDuration("168h"); c.Subscription.RetentionDuration > d { return nil, fmt.Errorf("retention_duration cannot be longer than 7 days") } if cxns := c.Subscription.ConnectionPoolSize; cxns < 1 { return nil, fmt.Errorf("Connection pool size must be >= 1, got: %d", cxns) } if c.CredentialsFile != "" { if _, err := os.Stat(c.CredentialsFile); os.IsNotExist(err) { return nil, fmt.Errorf("cannot find the credentials_file %q", c.CredentialsFile) } } return &c, nil }