kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java [352:371]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    for (SinkRecord record : resourceCreationResult.getBigtableErrors()) {
      reportError(record, new ConnectException(errorMessage));
      okMutations.remove(record);
    }
    for (SinkRecord record : resourceCreationResult.getDataErrors()) {
      reportError(record, new InvalidBigtableSchemaModificationException(errorMessage));
      okMutations.remove(record);
    }
    return okMutations;
  }

  /**
   * Attempts to create Cloud Bigtable column families so that all the mutations can be applied and
   * handles errors.
   *
   * @param mutations Input records and corresponding mutations.
   * @return Subset of the input argument containing only those record for which the target Cloud
   *     Bigtable column families exist.
   */
  @VisibleForTesting
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java [378:395]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    for (SinkRecord record : resourceCreationResult.getBigtableErrors()) {
      reportError(record, new ConnectException(errorMessage));
      okMutations.remove(record);
    }
    for (SinkRecord record : resourceCreationResult.getDataErrors()) {
      reportError(record, new InvalidBigtableSchemaModificationException(errorMessage));
      okMutations.remove(record);
    }
    return okMutations;
  }

  /**
   * Applies the mutations using upserts.
   *
   * @param mutations Mutations to be applied.
   * @param perRecordResults {@link Map} the per-record results will be written to.
   */
  @VisibleForTesting
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



