in bigquery/bigquery_storage_quickstart/main.go [261:327]
func processStream(ctx context.Context, client *bqStorage.BigQueryReadClient, st string, ch chan<- *storagepb.ReadRowsResponse) error {
var offset int64
// Streams may be long-running. Rather than using a global retry for the
// stream, implement a retry that resets once progress is made.
retryLimit := 3
retries := 0
for {
// Send the initiating request to start streaming row blocks.
rowStream, err := client.ReadRows(ctx, &storagepb.ReadRowsRequest{
ReadStream: st,
Offset: offset,
}, rpcOpts)
if err != nil {
return fmt.Errorf("couldn't invoke ReadRows: %w", err)
}
// Process the streamed responses.
for {
r, err := rowStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// If there is an error, check whether it is a retryable
// error with a retry delay and sleep instead of increasing
// retries count.
var retryDelayDuration time.Duration
if errorStatus, ok := status.FromError(err); ok && errorStatus.Code() == codes.ResourceExhausted {
for _, detail := range errorStatus.Details() {
retryInfo, ok := detail.(*errdetails.RetryInfo)
if !ok {
continue
}
retryDelay := retryInfo.GetRetryDelay()
retryDelayDuration = time.Duration(retryDelay.Seconds)*time.Second + time.Duration(retryDelay.Nanos)*time.Nanosecond
break
}
}
if retryDelayDuration != 0 {
log.Printf("processStream failed with a retryable error, retrying in %v", retryDelayDuration)
time.Sleep(retryDelayDuration)
} else {
retries++
if retries >= retryLimit {
return fmt.Errorf("processStream retries exhausted: %w", err)
}
}
// break the inner loop, and try to recover by starting a new streaming
// ReadRows call at the last known good offset.
break
} else {
// Reset retries after a successful response.
retries = 0
}
rc := r.GetRowCount()
if rc > 0 {
// Bookmark our progress in case of retries and send the rowblock on the channel.
offset = offset + rc
// We're making progress, reset retries.
retries = 0
ch <- r
}
}
}
}