func()

in plugins/teststeps/waitport/waitport.go [52:143]


func (ts *WaitPort) Run(ctx xcontext.Context, ch test.TestStepChannels, inputParams test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) {
	params, err := parseParameters(inputParams)
	if err != nil {
		return nil, err
	}

	f := func(ctx xcontext.Context, targetWithData *teststeps.TargetWithData) error {
		target := targetWithData.Target
		targetParams, err := expandParameters(target, params)
		if err != nil {
			return err
		}

		// Emit EventCmdStart
		// Can emit duplicate events on server restart / job resumption
		payload, err := json.Marshal(targetParams)
		if err != nil {
			ctx.Warnf("Cannot encode payload for %T: %v", params, err)
		} else {
			rm := json.RawMessage(payload)
			evData := testevent.Data{
				EventName: EventCmdStart,
				Target:    target,
				Payload:   &rm,
			}
			if err := ev.Emit(ctx, evData); err != nil {
				ctx.Warnf("Cannot emit event EventCmdStart: %v", err)
			}
		}

		var resultAddresses []string
		portStr := strconv.Itoa(targetParams.Port)
		if len(targetParams.Address) > 0 {
			resultAddresses = append(resultAddresses, net.JoinHostPort(targetParams.Address, portStr))
		} else {
			if len(target.FQDN) > 0 {
				resultAddresses = append(resultAddresses, net.JoinHostPort(target.FQDN, portStr))
			}
			if len(target.PrimaryIPv4) > 0 {
				resultAddresses = append(resultAddresses, net.JoinHostPort(target.PrimaryIPv4.String(), portStr))
			}
			if len(target.PrimaryIPv6) > 0 {
				resultAddresses = append(resultAddresses, net.JoinHostPort(target.PrimaryIPv6.String(), portStr))
			}
		}

		// The timeout restarts after a server restart/resume
		finishedContext, cancel := context.WithTimeout(ctx, targetParams.Timeout)
		defer cancel()

		resultErr := func() error {
			for {
				for _, addr := range resultAddresses {
					var d net.Dialer
					conn, err := d.DialContext(finishedContext, targetParams.Protocol, addr)
					if err == nil {
						ctx.Warnf("successfully connected via %s", addr)
						if err := conn.Close(); err != nil {
							ctx.Warnf("failed to close opened connection: %v", err)
						}
						return nil
					}
					ctx.Warnf("failed to connect to '%s', err: '%v'", addr, err)
					if finishedContext.Err() != nil {
						return finishedContext.Err()
					}
				}

				ctx.Infof("wait for the next iteration")
				select {
				case <-finishedContext.Done():
					return finishedContext.Err()
				case <-time.After(targetParams.CheckInterval):
				}
			}
		}()

		// Emit EventCmdEnd
		evData := testevent.Data{
			EventName: EventCmdEnd,
			Target:    target,
			Payload:   nil,
		}
		if err := ev.Emit(ctx, evData); err != nil {
			ctx.Warnf("Cannot emit event EventCmdEnd: %v", err)
		}

		ctx.Infof("wait port plugin finished, err: '%v'", resultErr)
		return resultErr
	}
	return teststeps.ForEachTargetWithResume(ctx, ch, resumeState, 0, f)
}