public StatementResult call()

in src/main/java/com/google/cloud/spanner/pgadapter/utils/MutationWriter.java [237:372]


  public StatementResult call() throws Exception {
    PipedInputStream inputStream = new PipedInputStream(payload, copySettings.getPipeBufferSize());
    pipeCreatedLatch.countDown();
    final CopyInParser parser =
        CopyInParser.create(
            copySettings.getSessionState(), copyFormat, csvFormat, inputStream, hasHeader);
    // This LinkedBlockingDeque holds a reference to all transactions that are currently active. The
    // max capacity of this deque is what ensures that we never have more than maxParallelism
    // transactions running at the same time. We could also achieve that by using a thread pool with
    // a fixed number of threads. The problem with that is however that Java does not have a thread
    // pool implementation that will block if a new task is offered and all threads are currently in
    // use. The only options are 'fail or add to queue'. We want to block our worker thread in this
    // case when the max parallel transactions has been reached, as that automatically creates back-
    // pressure in our entire pipeline that consists of:
    // Client app (psql) -> CopyData message -> CSVParser -> Transaction.
    LinkedBlockingDeque<ApiFuture<Void>> activeCommitFutures =
        new LinkedBlockingDeque<>(copySettings.getMaxParallelism());
    // This list holds all transactions that we have started. We will wait on this entire list
    // before finishing, to ensure that all data has been written before we signal that we are done.
    List<ApiFuture<Void>> allCommitFutures = new ArrayList<>();
    try {
      // Wait until we know whether we actually will receive any data. It could be that it is an
      // empty copy operation, and we should then end early.
      dataReceivedLatch.await();

      Iterator<CopyRecord> iterator = parser.iterator();
      List<Mutation> mutations = new ArrayList<>();
      long currentBufferByteSize = 0L;
      // Note: iterator.hasNext() blocks if there is not enough data in the pipeline to construct a
      // complete record. It returns false if the stream has been closed and all records have been
      // returned.
      while (bytesReceived.get() > 0L && !rollback.get() && iterator.hasNext()) {
        CopyRecord record = iterator.next();
        if (record.isEndRecord()) {
          break;
        }
        if (record.numColumns() != this.tableColumns.keySet().size()) {
          throw PGExceptionFactory.newPGException(
              String.format(
                  "Invalid COPY data: Row length mismatch. Expected %d values, but got %d.",
                  this.tableColumns.keySet().size(), record.numColumns()),
              SQLState.DataException);
        }

        Mutation mutation = buildMutation(record);
        int mutationSize = calculateSize(mutation);
        this.rowCount++;

        if (transactionMode == CopyTransactionMode.ImplicitNonAtomic) {
          currentBufferByteSize =
              addMutationAndMaybeFlushTransaction(
                  activeCommitFutures,
                  allCommitFutures,
                  mutations,
                  mutation,
                  currentBufferByteSize,
                  mutationSize);
        } else {
          mutations.add(mutation);
          currentBufferByteSize += mutationSize;
          if (mutations.size() > maxAtomicBatchSize) {
            throw SpannerExceptionFactory.newSpannerException(
                ErrorCode.FAILED_PRECONDITION,
                "Record count: "
                    + mutations.size()
                    + " has exceeded the limit: "
                    + maxAtomicBatchSize
                    + ".\n\nThe number of mutations per record is equal to the number of columns in the record "
                    + "plus the number of indexed columns in the record. The maximum number of mutations "
                    + "in one transaction is "
                    + copySettings.getMaxAtomicMutationsLimit()
                    + ".\n\nExecute `SET SPANNER.AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'` before executing a large COPY operation "
                    + "to instruct PGAdapter to automatically break large transactions into multiple smaller. "
                    + "This will make the COPY operation non-atomic.\n\n");
          }
          if (currentBufferByteSize > copySettings.getMaxAtomicCommitSize()) {
            throw SpannerExceptionFactory.newSpannerException(
                ErrorCode.FAILED_PRECONDITION,
                "Commit size: "
                    + currentBufferByteSize
                    + " has exceeded the limit: "
                    + copySettings.getMaxAtomicCommitSize()
                    + ".\n\nExecute `SET SPANNER.AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'` before executing a large COPY operation "
                    + "to instruct PGAdapter to automatically break large transactions into multiple smaller. "
                    + "This will make the COPY operation non-atomic.\n\n");
          }
        }
      } // end of iterator.hasNext()

      // There are no more CSVRecords in the pipeline.
      // Write any remaining mutations in the buffer.
      if (!rollback.get() && !mutations.isEmpty()) {
        if (transactionMode == CopyTransactionMode.Explicit) {
          connection.write(mutations);
        } else {
          // Wait until we have received a CopyDone message before writing the remaining data to
          // Spanner. If we are in a non-atomic transaction, there might already be data that have
          // been written to Spanner.
          closedLatch.await();
          if (commit.get()) {
            allCommitFutures.add(writeToSpannerAsync(activeCommitFutures, mutations));
          }
        }
      }
      // Wait for all commits to finish. We do this even if something went wrong, as it ensures two
      // things:
      // 1. All commits that were in flight when something went wrong will finish before we return
      //    an error to the client application. This prevents commits still being added to the
      //    database after we have returned an error, which could cause confusion.
      // 2. This will throw the underlying exception, so we can catch and register it.
      ApiFutures.allAsList(allCommitFutures).get();
    } catch (SpannerException e) {
      synchronized (lock) {
        this.exception = PGExceptionFactory.toPGException(e);
        throw this.exception;
      }
    } catch (ExecutionException e) {
      synchronized (lock) {
        this.exception = PGExceptionFactory.toPGException(e.getCause());
        throw this.exception;
      }
    } catch (Exception e) {
      synchronized (lock) {
        this.exception = PGExceptionFactory.toPGException(e);
        throw this.exception;
      }
    } finally {
      this.executorService.shutdown();
      if (!this.executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
        logger.log(Level.WARNING, "Timeout while waiting for MutationWriter executor to shutdown.");
      }
      this.payload.close();
      parser.close();
    }
    return new UpdateCount(rowCount);
  }