in plugins/targetmanagers/csvtargetmanager/csvfile.go [101:206]
func (tf *CSVFileTargetManager) Acquire(ctx xcontext.Context, jobID types.JobID, jobTargetManagerAcquireTimeout time.Duration, parameters interface{}, tl target.Locker) ([]*target.Target, error) {
acquireParameters, ok := parameters.(AcquireParameters)
if !ok {
return nil, fmt.Errorf("Acquire expects %T object, got %T", acquireParameters, parameters)
}
fd, err := os.Open(acquireParameters.FileURI.Path)
if err != nil {
return nil, err
}
defer fd.Close()
hosts := make([]*target.Target, 0)
r := csv.NewReader(fd)
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if len(record) == 0 {
// skip blank lines
continue
}
if len(record) != 4 {
return nil, errors.New("malformed input file, need 4 entries per record (ID, FQDN, IPv4, IPv6)")
}
t := &target.Target{ID: strings.TrimSpace(record[0])}
if t.ID == "" {
return nil, errors.New("invalid empty string for host ID")
}
fqdn := strings.TrimSpace(record[1])
if fqdn != "" {
// no validation on fqdns
t.FQDN = fqdn
}
if ipv4 := strings.TrimSpace(record[2]); ipv4 != "" {
t.PrimaryIPv4 = net.ParseIP(ipv4)
if t.PrimaryIPv4 == nil || t.PrimaryIPv4.To4() == nil {
// didn't parse
return nil, fmt.Errorf("invalid non-empty IPv4 address \"%s\"", ipv4)
}
}
if ipv6 := strings.TrimSpace(record[3]); ipv6 != "" {
t.PrimaryIPv6 = net.ParseIP(ipv6)
if t.PrimaryIPv6 == nil || t.PrimaryIPv6.To16() == nil {
// didn't parse
return nil, fmt.Errorf("invalid non-empty IPv6 address \"%s\"", ipv6)
}
}
if len(acquireParameters.HostPrefixes) == 0 {
hosts = append(hosts, t)
} else if t.FQDN != "" {
// host prefix filtering only works on devices with a FQDN
firstComponent := strings.Split(t.FQDN, ".")[0]
for _, hp := range acquireParameters.HostPrefixes {
if strings.HasPrefix(firstComponent, hp) {
hosts = append(hosts, t)
}
}
}
}
if uint32(len(hosts)) < acquireParameters.MinNumberDevices {
return nil, fmt.Errorf("not enough hosts found in CSV file '%s', want %d, got %d",
acquireParameters.FileURI.Path,
acquireParameters.MinNumberDevices,
len(hosts),
)
}
ctx.Debugf("Found %d targets in %s", len(hosts), acquireParameters.FileURI.Path)
if acquireParameters.Shuffle {
ctx.Infof("Shuffling targets")
rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})
}
// feed all devices into new API call `TryLock`, with desired limit
lockedString, err := tl.TryLock(ctx, jobID, jobTargetManagerAcquireTimeout, hosts, uint(acquireParameters.MaxNumberDevices))
if err != nil {
return nil, fmt.Errorf("failed to lock targets: %w", err)
}
locked, err := target.FilterTargets(lockedString, hosts)
if err != nil {
return nil, fmt.Errorf("can not find locked targets in hosts")
}
// check if we got enough
if len(locked) >= int(acquireParameters.MinNumberDevices) {
// done, we got enough and they are locked
} else {
// not enough, unlock what we got and fail
if len(locked) > 0 {
err = tl.Unlock(ctx, jobID, locked)
if err != nil {
return nil, fmt.Errorf("can't unlock targets")
}
}
return nil, fmt.Errorf("can't lock enough targets")
}
tf.hosts = locked
return locked, nil
}