func()

in cli_tools/common/utils/storage/buffered_writer.go [97:147]


func (b *BufferedWriter) uploadWorker() {
	defer b.Done()
	for in := range b.upload {
		for i := 1; ; i++ {
			err := func() error {
				client, err := b.client(b.ctx, b.oauth)
				if err != nil {
					return err
				}
				defer client.Close()

				file, err := os.Open(in)
				if err != nil {
					return err
				}
				defer file.Close()

				tmpObj := path.Join(b.obj, strings.TrimPrefix(in, b.prefix))
				b.addObj(tmpObj)
				dst := client.GetObject(b.bkt, tmpObj).NewWriter()
				if _, err := io.Copy(dst, file); err != nil {
					if io.EOF != err {
						return err
					}
				}
				return dst.Close()
			}()
			if err != nil {
				// Don't retry if permission error as it's not recoverable.
				gAPIErr, isGAPIErr := err.(*googleapi.Error)
				if isGAPIErr && gAPIErr.Code == 403 && gcsPermissionErrorRegExp.MatchString(gAPIErr.Message) {
					fmt.Printf("%v: %v\n", b.errLogPrefix, err)
					exit(2)
					break
				}

				fmt.Printf("Failed %v time(s) to upload '%v', error: %v\n", i, in, err)
				if i > 16 {
					fmt.Printf("%v: %v\n", b.errLogPrefix, err)
					log.Fatal(err)
				}

				fmt.Printf("Retrying upload '%v' after %v second(s)...\n", in, i)
				time.Sleep(time.Duration(1*i) * time.Second)
				continue
			}
			os.Remove(in)
			break
		}
	}
}