func()

in plugins/targetmanagers/csvtargetmanager/csvfile.go [101:206]


func (tf *CSVFileTargetManager) Acquire(ctx xcontext.Context, jobID types.JobID, jobTargetManagerAcquireTimeout time.Duration, parameters interface{}, tl target.Locker) ([]*target.Target, error) {
	acquireParameters, ok := parameters.(AcquireParameters)
	if !ok {
		return nil, fmt.Errorf("Acquire expects %T object, got %T", acquireParameters, parameters)
	}
	fd, err := os.Open(acquireParameters.FileURI.Path)
	if err != nil {
		return nil, err
	}
	defer fd.Close()

	hosts := make([]*target.Target, 0)
	r := csv.NewReader(fd)
	for {
		record, err := r.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			return nil, err
		}
		if len(record) == 0 {
			// skip blank lines
			continue
		}
		if len(record) != 4 {
			return nil, errors.New("malformed input file, need 4 entries per record (ID, FQDN, IPv4, IPv6)")
		}
		t := &target.Target{ID: strings.TrimSpace(record[0])}
		if t.ID == "" {
			return nil, errors.New("invalid empty string for host ID")
		}
		fqdn := strings.TrimSpace(record[1])
		if fqdn != "" {
			// no validation on fqdns
			t.FQDN = fqdn
		}
		if ipv4 := strings.TrimSpace(record[2]); ipv4 != "" {
			t.PrimaryIPv4 = net.ParseIP(ipv4)
			if t.PrimaryIPv4 == nil || t.PrimaryIPv4.To4() == nil {
				// didn't parse
				return nil, fmt.Errorf("invalid non-empty IPv4 address \"%s\"", ipv4)
			}
		}
		if ipv6 := strings.TrimSpace(record[3]); ipv6 != "" {
			t.PrimaryIPv6 = net.ParseIP(ipv6)
			if t.PrimaryIPv6 == nil || t.PrimaryIPv6.To16() == nil {
				// didn't parse
				return nil, fmt.Errorf("invalid non-empty IPv6 address \"%s\"", ipv6)
			}
		}
		if len(acquireParameters.HostPrefixes) == 0 {
			hosts = append(hosts, t)
		} else if t.FQDN != "" {
			// host prefix filtering only works on devices with a FQDN
			firstComponent := strings.Split(t.FQDN, ".")[0]
			for _, hp := range acquireParameters.HostPrefixes {
				if strings.HasPrefix(firstComponent, hp) {
					hosts = append(hosts, t)
				}
			}
		}
	}

	if uint32(len(hosts)) < acquireParameters.MinNumberDevices {
		return nil, fmt.Errorf("not enough hosts found in CSV file '%s', want %d, got %d",
			acquireParameters.FileURI.Path,
			acquireParameters.MinNumberDevices,
			len(hosts),
		)
	}
	ctx.Debugf("Found %d targets in %s", len(hosts), acquireParameters.FileURI.Path)
	if acquireParameters.Shuffle {
		ctx.Infof("Shuffling targets")
		rand.Shuffle(len(hosts), func(i, j int) {
			hosts[i], hosts[j] = hosts[j], hosts[i]
		})
	}

	// feed all devices into new API call `TryLock`, with desired limit
	lockedString, err := tl.TryLock(ctx, jobID, jobTargetManagerAcquireTimeout, hosts, uint(acquireParameters.MaxNumberDevices))
	if err != nil {
		return nil, fmt.Errorf("failed to lock targets: %w", err)
	}
	locked, err := target.FilterTargets(lockedString, hosts)
	if err != nil {
		return nil, fmt.Errorf("can not find locked targets in hosts")
	}

	// check if we got enough
	if len(locked) >= int(acquireParameters.MinNumberDevices) {
		// done, we got enough and they are locked
	} else {
		// not enough, unlock what we got and fail
		if len(locked) > 0 {
			err = tl.Unlock(ctx, jobID, locked)
			if err != nil {
				return nil, fmt.Errorf("can't unlock targets")
			}
		}
		return nil, fmt.Errorf("can't lock enough targets")
	}

	tf.hosts = locked
	return locked, nil
}