in agent/dockerclient/dockerapi/docker_client.go [371:472]
func (dg *dockerGoClient) pullImage(ctx context.Context, image string,
authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError {
seelog.Debugf("DockerGoClient: pulling image: %s", image)
client, err := dg.sdkDockerClient()
if err != nil {
return CannotGetDockerClientError{version: dg.version, err: err}
}
sdkAuthConfig, err := dg.getAuthdata(image, authData)
if err != nil {
return wrapPullErrorAsNamedError(err)
}
// encode auth data
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(sdkAuthConfig); err != nil {
return CannotPullECRContainerError{err}
}
imagePullOpts := types.ImagePullOptions{
All: false,
RegistryAuth: base64.URLEncoding.EncodeToString(buf.Bytes()),
}
repository := getRepository(image)
timeout := dg.time().After(dockerclient.DockerPullBeginTimeout)
// pullBegan is a channel indicating that we have seen at least one line of data on the 'OutputStream' above.
// It is here to guard against a bug wherein Docker never writes anything to that channel and hangs in pulling forever.
pullBegan := make(chan bool, 1)
// pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times)
pullBeganOnce := sync.Once{}
pullFinished := make(chan error, 1)
subCtx, cancelRequest := context.WithCancel(ctx)
go func() {
defer cancelRequest()
reader, err := client.ImagePull(subCtx, repository, imagePullOpts)
if err != nil {
pullFinished <- err
return
}
// handle inactivity timeout
var canceled uint32
var ch chan<- struct{}
reader, ch = dg.inactivityTimeoutHandler(reader, dg.config.ImagePullInactivityTimeout, cancelRequest, &canceled)
defer reader.Close()
defer close(ch)
decoder := json.NewDecoder(reader)
data := new(ImagePullResponse)
var statusDisplayed time.Time
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
if err != nil {
seelog.Warnf("DockerGoClient: Unable to decode pull event message for image %s: %v", image, err)
pullFinished <- err
return
}
if data.Error != "" {
seelog.Warnf("DockerGoClient: Error while pulling image %s: %v", image, data.Error)
pullFinished <- errors.New(data.Error)
}
if atomic.LoadUint32(&canceled) != 0 {
seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while pulling image %s", image)
pullErr := errors.New("inactivity time exceeded timeout while pulling image")
pullFinished <- pullErr
return
}
pullBeganOnce.Do(func() {
pullBegan <- true
})
statusDisplayed = dg.filterPullDebugOutput(data, image, statusDisplayed)
data = new(ImagePullResponse)
}
pullFinished <- nil
}()
select {
case <-pullBegan:
break
case pullErr := <-pullFinished:
if pullErr != nil {
return CannotPullContainerError{pullErr}
}
seelog.Debugf("DockerGoClient: pulling image complete: %s", image)
return nil
case <-timeout:
return &DockerTimeoutError{dockerclient.DockerPullBeginTimeout, "pullBegin"}
}
seelog.Debugf("DockerGoClient: pull began for image: %s", image)
err = <-pullFinished
if err != nil {
return CannotPullContainerError{err}
}
seelog.Debugf("DockerGoClient: pulling image complete: %s", image)
return nil
}