in plugins/teststeps/waitport/waitport.go [52:143]
func (ts *WaitPort) Run(ctx xcontext.Context, ch test.TestStepChannels, inputParams test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) {
params, err := parseParameters(inputParams)
if err != nil {
return nil, err
}
f := func(ctx xcontext.Context, targetWithData *teststeps.TargetWithData) error {
target := targetWithData.Target
targetParams, err := expandParameters(target, params)
if err != nil {
return err
}
// Emit EventCmdStart
// Can emit duplicate events on server restart / job resumption
payload, err := json.Marshal(targetParams)
if err != nil {
ctx.Warnf("Cannot encode payload for %T: %v", params, err)
} else {
rm := json.RawMessage(payload)
evData := testevent.Data{
EventName: EventCmdStart,
Target: target,
Payload: &rm,
}
if err := ev.Emit(ctx, evData); err != nil {
ctx.Warnf("Cannot emit event EventCmdStart: %v", err)
}
}
var resultAddresses []string
portStr := strconv.Itoa(targetParams.Port)
if len(targetParams.Address) > 0 {
resultAddresses = append(resultAddresses, net.JoinHostPort(targetParams.Address, portStr))
} else {
if len(target.FQDN) > 0 {
resultAddresses = append(resultAddresses, net.JoinHostPort(target.FQDN, portStr))
}
if len(target.PrimaryIPv4) > 0 {
resultAddresses = append(resultAddresses, net.JoinHostPort(target.PrimaryIPv4.String(), portStr))
}
if len(target.PrimaryIPv6) > 0 {
resultAddresses = append(resultAddresses, net.JoinHostPort(target.PrimaryIPv6.String(), portStr))
}
}
// The timeout restarts after a server restart/resume
finishedContext, cancel := context.WithTimeout(ctx, targetParams.Timeout)
defer cancel()
resultErr := func() error {
for {
for _, addr := range resultAddresses {
var d net.Dialer
conn, err := d.DialContext(finishedContext, targetParams.Protocol, addr)
if err == nil {
ctx.Warnf("successfully connected via %s", addr)
if err := conn.Close(); err != nil {
ctx.Warnf("failed to close opened connection: %v", err)
}
return nil
}
ctx.Warnf("failed to connect to '%s', err: '%v'", addr, err)
if finishedContext.Err() != nil {
return finishedContext.Err()
}
}
ctx.Infof("wait for the next iteration")
select {
case <-finishedContext.Done():
return finishedContext.Err()
case <-time.After(targetParams.CheckInterval):
}
}
}()
// Emit EventCmdEnd
evData := testevent.Data{
EventName: EventCmdEnd,
Target: target,
Payload: nil,
}
if err := ev.Emit(ctx, evData); err != nil {
ctx.Warnf("Cannot emit event EventCmdEnd: %v", err)
}
ctx.Infof("wait port plugin finished, err: '%v'", resultErr)
return resultErr
}
return teststeps.ForEachTargetWithResume(ctx, ch, resumeState, 0, f)
}