plugins/targetmanagers/csvtargetmanager/csvfile.go (163 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. // Package csvtargetmanager implements a simple target manager that parses a CSV // file. The format of the CSV file is the following: // // 123,hostname1.example.com,1.2.3.4, // 456,hostname2,,2001:db8::1 // // In other words, four fields: the first containing a unique ID for the device // (might be identical to the IP or FQDN), next one is FQDN, // and then IPv4 and IPv6. // All fields except ID are optional, but many plugins require FQDN or IP fields to // reach the targets over the network. package csvtargetmanager import ( "encoding/csv" "encoding/json" "errors" "fmt" "io" "math/rand" "net" "os" "strings" "time" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/pkg/xcontext" "github.com/insomniacslk/xjson" ) // Name defined the name of the plugin var ( Name = "CSVFileTargetManager" ) // AcquireParameters contains the parameters necessary to acquire targets. type AcquireParameters struct { FileURI *xjson.URL MinNumberDevices uint32 MaxNumberDevices uint32 HostPrefixes []string Shuffle bool } // ReleaseParameters contains the parameters necessary to release targets. type ReleaseParameters struct { } // CSVFileTargetManager implements the contest.TargetManager interface, reading // CSV entries from a text file. type CSVFileTargetManager struct { hosts []*target.Target } // ValidateAcquireParameters performs sanity checks on the fields of the // parameters that will be passed to Acquire. func (tf CSVFileTargetManager) ValidateAcquireParameters(params []byte) (interface{}, error) { var ap AcquireParameters if err := json.Unmarshal(params, &ap); err != nil { return nil, err } for idx, hp := range ap.HostPrefixes { hp = strings.TrimSpace(hp) if hp == "" { return nil, fmt.Errorf("Host prefix cannot be empty string if specified") } // reassign after removing surrounding spaces ap.HostPrefixes[idx] = hp } if ap.FileURI == nil { return nil, fmt.Errorf("file URI not specified in acquire parameters") } if ap.FileURI.Scheme != "file" && ap.FileURI.Scheme != "" { return nil, fmt.Errorf("unsupported scheme: '%s', only 'file' or empty string are accepted", ap.FileURI.Scheme) } if ap.FileURI.Host != "" && ap.FileURI.Host != "localhost" { return nil, fmt.Errorf("unsupported host '%s', only 'localhost' or empty string are accepted", ap.FileURI.Host) } return ap, nil } // ValidateReleaseParameters performs sanity checks on the fields of the // parameters that will be passed to Release. func (tf CSVFileTargetManager) ValidateReleaseParameters(params []byte) (interface{}, error) { var rp ReleaseParameters if err := json.Unmarshal(params, &rp); err != nil { return nil, err } return rp, nil } // Acquire implements contest.TargetManager.Acquire, reading one entry per line // from a text file. Each input record looks like this: ID,FQDN,IPv4,IPv6. Only ID is required func (tf *CSVFileTargetManager) Acquire(ctx xcontext.Context, jobID types.JobID, jobTargetManagerAcquireTimeout time.Duration, parameters interface{}, tl target.Locker) ([]*target.Target, error) { acquireParameters, ok := parameters.(AcquireParameters) if !ok { return nil, fmt.Errorf("Acquire expects %T object, got %T", acquireParameters, parameters) } fd, err := os.Open(acquireParameters.FileURI.Path) if err != nil { return nil, err } defer fd.Close() hosts := make([]*target.Target, 0) r := csv.NewReader(fd) for { record, err := r.Read() if err == io.EOF { break } if err != nil { return nil, err } if len(record) == 0 { // skip blank lines continue } if len(record) != 4 { return nil, errors.New("malformed input file, need 4 entries per record (ID, FQDN, IPv4, IPv6)") } t := &target.Target{ID: strings.TrimSpace(record[0])} if t.ID == "" { return nil, errors.New("invalid empty string for host ID") } fqdn := strings.TrimSpace(record[1]) if fqdn != "" { // no validation on fqdns t.FQDN = fqdn } if ipv4 := strings.TrimSpace(record[2]); ipv4 != "" { t.PrimaryIPv4 = net.ParseIP(ipv4) if t.PrimaryIPv4 == nil || t.PrimaryIPv4.To4() == nil { // didn't parse return nil, fmt.Errorf("invalid non-empty IPv4 address \"%s\"", ipv4) } } if ipv6 := strings.TrimSpace(record[3]); ipv6 != "" { t.PrimaryIPv6 = net.ParseIP(ipv6) if t.PrimaryIPv6 == nil || t.PrimaryIPv6.To16() == nil { // didn't parse return nil, fmt.Errorf("invalid non-empty IPv6 address \"%s\"", ipv6) } } if len(acquireParameters.HostPrefixes) == 0 { hosts = append(hosts, t) } else if t.FQDN != "" { // host prefix filtering only works on devices with a FQDN firstComponent := strings.Split(t.FQDN, ".")[0] for _, hp := range acquireParameters.HostPrefixes { if strings.HasPrefix(firstComponent, hp) { hosts = append(hosts, t) } } } } if uint32(len(hosts)) < acquireParameters.MinNumberDevices { return nil, fmt.Errorf("not enough hosts found in CSV file '%s', want %d, got %d", acquireParameters.FileURI.Path, acquireParameters.MinNumberDevices, len(hosts), ) } ctx.Debugf("Found %d targets in %s", len(hosts), acquireParameters.FileURI.Path) if acquireParameters.Shuffle { ctx.Infof("Shuffling targets") rand.Shuffle(len(hosts), func(i, j int) { hosts[i], hosts[j] = hosts[j], hosts[i] }) } // feed all devices into new API call `TryLock`, with desired limit lockedString, err := tl.TryLock(ctx, jobID, jobTargetManagerAcquireTimeout, hosts, uint(acquireParameters.MaxNumberDevices)) if err != nil { return nil, fmt.Errorf("failed to lock targets: %w", err) } locked, err := target.FilterTargets(lockedString, hosts) if err != nil { return nil, fmt.Errorf("can not find locked targets in hosts") } // check if we got enough if len(locked) >= int(acquireParameters.MinNumberDevices) { // done, we got enough and they are locked } else { // not enough, unlock what we got and fail if len(locked) > 0 { err = tl.Unlock(ctx, jobID, locked) if err != nil { return nil, fmt.Errorf("can't unlock targets") } } return nil, fmt.Errorf("can't lock enough targets") } tf.hosts = locked return locked, nil } // Release releases the acquired resources. func (tf *CSVFileTargetManager) Release(ctx xcontext.Context, jobID types.JobID, targets []*target.Target, params interface{}) error { return nil } // New builds a CSVFileTargetManager func New() target.TargetManager { return &CSVFileTargetManager{} } // Load returns the name and factory which are needed to register the // TargetManager. func Load() (string, target.TargetManagerFactory) { return Name, New }