func()

in agent/dockerclient/dockerapi/docker_client.go [371:472]


func (dg *dockerGoClient) pullImage(ctx context.Context, image string,
	authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError {
	seelog.Debugf("DockerGoClient: pulling image: %s", image)
	client, err := dg.sdkDockerClient()
	if err != nil {
		return CannotGetDockerClientError{version: dg.version, err: err}
	}

	sdkAuthConfig, err := dg.getAuthdata(image, authData)
	if err != nil {
		return wrapPullErrorAsNamedError(err)
	}
	// encode auth data
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(sdkAuthConfig); err != nil {
		return CannotPullECRContainerError{err}
	}

	imagePullOpts := types.ImagePullOptions{
		All:          false,
		RegistryAuth: base64.URLEncoding.EncodeToString(buf.Bytes()),
	}

	repository := getRepository(image)

	timeout := dg.time().After(dockerclient.DockerPullBeginTimeout)
	// pullBegan is a channel indicating that we have seen at least one line of data on the 'OutputStream' above.
	// It is here to guard against a bug wherein Docker never writes anything to that channel and hangs in pulling forever.
	pullBegan := make(chan bool, 1)
	// pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times)
	pullBeganOnce := sync.Once{}

	pullFinished := make(chan error, 1)
	subCtx, cancelRequest := context.WithCancel(ctx)

	go func() {
		defer cancelRequest()
		reader, err := client.ImagePull(subCtx, repository, imagePullOpts)
		if err != nil {
			pullFinished <- err
			return
		}

		// handle inactivity timeout
		var canceled uint32
		var ch chan<- struct{}
		reader, ch = dg.inactivityTimeoutHandler(reader, dg.config.ImagePullInactivityTimeout, cancelRequest, &canceled)
		defer reader.Close()
		defer close(ch)
		decoder := json.NewDecoder(reader)
		data := new(ImagePullResponse)
		var statusDisplayed time.Time
		for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
			if err != nil {
				seelog.Warnf("DockerGoClient: Unable to decode pull event message for image %s: %v", image, err)
				pullFinished <- err
				return
			}
			if data.Error != "" {
				seelog.Warnf("DockerGoClient: Error while pulling image %s: %v", image, data.Error)
				pullFinished <- errors.New(data.Error)
			}
			if atomic.LoadUint32(&canceled) != 0 {
				seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while pulling image %s", image)
				pullErr := errors.New("inactivity time exceeded timeout while pulling image")
				pullFinished <- pullErr
				return
			}

			pullBeganOnce.Do(func() {
				pullBegan <- true
			})

			statusDisplayed = dg.filterPullDebugOutput(data, image, statusDisplayed)

			data = new(ImagePullResponse)
		}
		pullFinished <- nil
	}()

	select {
	case <-pullBegan:
		break
	case pullErr := <-pullFinished:
		if pullErr != nil {
			return CannotPullContainerError{pullErr}
		}
		seelog.Debugf("DockerGoClient: pulling image complete: %s", image)
		return nil
	case <-timeout:
		return &DockerTimeoutError{dockerclient.DockerPullBeginTimeout, "pullBegin"}
	}
	seelog.Debugf("DockerGoClient: pull began for image: %s", image)

	err = <-pullFinished
	if err != nil {
		return CannotPullContainerError{err}
	}

	seelog.Debugf("DockerGoClient: pulling image complete: %s", image)
	return nil
}