long insertAll()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java [1083:1318]


    <T> long insertAll(
        TableReference ref,
        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
        @Nullable List<String> insertIdList,
        BackOff backoff,
        FluentBackoff rateLimitBackoffFactory,
        final Sleeper sleeper,
        InsertRetryPolicy retryPolicy,
        List<ValueInSingleWindow<T>> failedInserts,
        ErrorContainer<T> errorContainer,
        boolean skipInvalidRows,
        boolean ignoreUnknownValues,
        boolean ignoreInsertIds,
        List<ValueInSingleWindow<TableRow>> successfulRows)
        throws IOException, InterruptedException {
      checkNotNull(ref, "ref");
      if (executor == null) {
        this.executor =
            new BoundedExecutorService(
                MoreExecutors.listeningDecorator(options.as(GcsOptions.class).getExecutorService()),
                options.as(BigQueryOptions.class).getInsertBundleParallelism());
      }
      if (insertIdList != null && rowList.size() != insertIdList.size()) {
        throw new AssertionError(
            "If insertIdList is not null it needs to have at least "
                + "as many elements as rowList");
      }
      StreamingInsertsMetrics streamingInsertsResults =
          BigQuerySinkMetrics.streamingInsertsMetrics();
      int numFailedRows = 0;
      final Set<Integer> failedIndices = new HashSet<>();
      long retTotalDataSize = 0;
      List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
      // These lists contain the rows to publish. Initially the contain the entire list.
      // If there are failures, they will contain only the failed rows to be retried.
      List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish = rowList;
      List<String> idsToPublish = null;
      if (!ignoreInsertIds) {
        idsToPublish = insertIdList;
      }

      while (true) {
        List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
        List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null;

        int strideIndex = 0;
        // Upload in batches.
        List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>();
        long dataSize = 0L;

        List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
        List<Integer> strideIndices = new ArrayList<>();
        // Store the longest throttled time across all parallel threads
        final AtomicLong maxThrottlingMsec = new AtomicLong();

        int rowIndex = 0;
        while (rowIndex < rowsToPublish.size()) {
          TableRow row = rowsToPublish.get(rowIndex).getValue();
          long nextRowSize = 0L;
          try {
            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
          } catch (Exception ex) {
            throw new RuntimeException("Failed to convert the row to JSON", ex);
          }

          // The following scenario must be *extremely* rare.
          // If this row's encoding by itself is larger than the maximum row payload, then it's
          // impossible to insert into BigQuery, and so we send it out through the dead-letter
          // queue.
          if (nextRowSize >= MAX_BQ_ROW_PAYLOAD) {
            InsertErrors error =
                new InsertErrors()
                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row-too-large")));
            // We verify whether the retryPolicy parameter expects us to retry. If it does, then
            // it will return true. Otherwise it will return false.
            Boolean isRetry = retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error));
            if (isRetry) {
              throw new RuntimeException(
                  String.format(
                      "We have observed a row that is %s bytes in size and exceeded BigQueryIO"
                          + " limit of 9MB. While BigQuery supports request sizes up to 10MB,"
                          + " BigQueryIO sets the limit at 9MB to leave room for request"
                          + " overhead. You may change your retry strategy to unblock this"
                          + " pipeline, and the row will be output as a failed insert.",
                      nextRowSize));
            } else {
              numFailedRows += 1;
              errorContainer.add(failedInserts, error, ref, rowsToPublish.get(rowIndex));
              failedIndices.add(rowIndex);
              rowIndex++;
              continue;
            }
          }

          // If adding the next row will push the request above BQ row limits, or
          // if the current batch of elements is larger than the targeted request size,
          // we immediately go and issue the data insertion.
          if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD
              || dataSize >= maxRowBatchSize
              || rows.size() + 1 > maxRowsPerBatch) {
            // If the row does not fit into the insert buffer, then we take the current buffer,
            // issue the insert call, and we retry adding the same row to the troublesome buffer.
            // Add a future to insert the current batch into BQ.
            futures.add(
                executor.submit(
                    new InsertBatchofRowsCallable(
                        ref,
                        skipInvalidRows,
                        ignoreUnknownValues,
                        client,
                        rateLimitBackoffFactory,
                        rows,
                        maxThrottlingMsec,
                        sleeper,
                        streamingInsertsResults)));
            strideIndices.add(strideIndex);
            retTotalDataSize += dataSize;
            strideIndex = rowIndex;
            rows = new ArrayList<>();
            dataSize = 0L;
          }
          // If the row fits into the insert buffer, then we add it to the buffer to be inserted
          // later, and we move onto the next row.
          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
          if (idsToPublish != null) {
            out.setInsertId(idsToPublish.get(rowIndex));
          }
          out.setJson(row.getUnknownKeys());
          rows.add(out);
          rowIndex++;
          dataSize += nextRowSize;
        }

        if (rows.size() > 0) {
          futures.add(
              executor.submit(
                  new InsertBatchofRowsCallable(
                      ref,
                      skipInvalidRows,
                      ignoreUnknownValues,
                      client,
                      rateLimitBackoffFactory,
                      rows,
                      maxThrottlingMsec,
                      sleeper,
                      streamingInsertsResults)));
          strideIndices.add(strideIndex);
          retTotalDataSize += dataSize;
          rows = new ArrayList<>();
        }

        try {
          for (int i = 0; i < futures.size(); i++) {
            List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
            if (errors == null) {
              continue;
            }

            for (TableDataInsertAllResponse.InsertErrors error : errors) {
              if (error.getIndex() == null) {
                throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
              }
              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
              failedIndices.add(errorIndex);
              if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
                allErrors.add(error);
                retryRows.add(rowsToPublish.get(errorIndex));
                // TODO (https://github.com/apache/beam/issues/20891): Select the retry rows(using
                // errorIndex) from the batch of rows which attempted insertion in this call.
                // Not the entire set of rows in rowsToPublish.
                if (retryIds != null) {
                  retryIds.add(idsToPublish.get(errorIndex));
                }
              } else {
                numFailedRows += 1;
                errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
              }
            }
          }
          // Accumulate the longest throttled time across all parallel threads
          throttlingMsecs.inc(maxThrottlingMsec.get());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while inserting " + rowsToPublish);
        } catch (ExecutionException e) {
          streamingInsertsResults.updateStreamingInsertsMetrics(
              ref, rowList.size(), rowList.size());
          throw new RuntimeException(e.getCause());
        }

        if (allErrors.isEmpty()) {
          break;
        }
        long nextBackoffMillis = backoff.nextBackOffMillis();
        if (nextBackoffMillis == BackOff.STOP) {
          break;
        }
        try {
          sleeper.sleep(nextBackoffMillis);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
        }
        rowsToPublish = retryRows;
        idsToPublish = retryIds;
        streamingInsertsResults.updateRetriedRowsWithStatus(
            BigQuerySinkMetrics.INTERNAL, retryRows.size());
        // print first 5 failures
        int numErrorToLog = Math.min(allErrors.size(), 5);
        LOG.info(
            "Retrying {} failed inserts to BigQuery. First {} fails: {}",
            rowsToPublish.size(),
            numErrorToLog,
            allErrors.subList(0, numErrorToLog));
        allErrors.clear();
      }
      if (successfulRows != null) {
        for (int i = 0; i < rowsToPublish.size(); i++) {
          if (!failedIndices.contains(i)) {
            successfulRows.add(
                ValueInSingleWindow.of(
                    rowsToPublish.get(i).getValue(),
                    rowsToPublish.get(i).getTimestamp(),
                    rowsToPublish.get(i).getWindow(),
                    rowsToPublish.get(i).getPane()));
          }
        }
      }
      numFailedRows += allErrors.size();
      streamingInsertsResults.updateStreamingInsertsMetrics(ref, rowList.size(), numFailedRows);
      if (!allErrors.isEmpty()) {
        throw new IOException("Insert failed: " + allErrors);
      } else {
        return retTotalDataSize;
      }
    }