in cli_tools/common/utils/storage/buffered_writer.go [97:147]
func (b *BufferedWriter) uploadWorker() {
defer b.Done()
for in := range b.upload {
for i := 1; ; i++ {
err := func() error {
client, err := b.client(b.ctx, b.oauth)
if err != nil {
return err
}
defer client.Close()
file, err := os.Open(in)
if err != nil {
return err
}
defer file.Close()
tmpObj := path.Join(b.obj, strings.TrimPrefix(in, b.prefix))
b.addObj(tmpObj)
dst := client.GetObject(b.bkt, tmpObj).NewWriter()
if _, err := io.Copy(dst, file); err != nil {
if io.EOF != err {
return err
}
}
return dst.Close()
}()
if err != nil {
// Don't retry if permission error as it's not recoverable.
gAPIErr, isGAPIErr := err.(*googleapi.Error)
if isGAPIErr && gAPIErr.Code == 403 && gcsPermissionErrorRegExp.MatchString(gAPIErr.Message) {
fmt.Printf("%v: %v\n", b.errLogPrefix, err)
exit(2)
break
}
fmt.Printf("Failed %v time(s) to upload '%v', error: %v\n", i, in, err)
if i > 16 {
fmt.Printf("%v: %v\n", b.errLogPrefix, err)
log.Fatal(err)
}
fmt.Printf("Retrying upload '%v' after %v second(s)...\n", in, i)
time.Sleep(time.Duration(1*i) * time.Second)
continue
}
os.Remove(in)
break
}
}
}