public void processElement()

in v1/src/main/java/com/google/cloud/teleport/spanner/SplitIntoRangesFn.java [85:267]


  public void processElement(ProcessContext c) throws FileNotFoundException {
    Map<String, String> filenamesToTableNamesMap = c.sideInput(filenamesToTableNamesMapView);
    Metadata metadata = c.element().getMetadata();
    String filename = metadata.resourceId().toString();
    String tableName = filenamesToTableNamesMap.get(filename);

    if (tableName == null) {
      throw new FileNotFoundException(
          "Unknown table name for file:" + filename + " in map " + filenamesToTableNamesMap);
    }
    if (!metadata.isReadSeekEfficient()) {
      // Do not shard the file.
      c.output(
          FileShard.create(
              tableName, c.element(), new OffsetRange(0, metadata.sizeBytes()), Long.MAX_VALUE));
      return;
    }
    if (!handleNewLine.get()) {
      // Create shards without parsing.
      for (OffsetRange range :
          new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
        // We fill in MAX_VALUE in the record count as it is not needed when not in handleNewLine
        // mode. This mode uses bounded reader to read the file instead of the CSVParser.
        c.output(FileShard.create(tableName, c.element(), range, Long.MAX_VALUE));
      }
      return;
    }
    try {
      ReadableByteChannel channel = FileSystems.open(metadata.resourceId());
      InputStream stream = Channels.newInputStream(channel);
      Reader reader = new InputStreamReader(stream);
      char escape =
          (escapeChar == null || escapeChar.get() == null) ? ((char) 0) : escapeChar.get();

      int data = 0, prevData = 0;
      // We need to store the record count to know when to stop reading the shard as the CSVParser
      // library does not provide a way to use the end of shard position(in bytes) directly.
      long bytesRead = 0, prevMarker = 0, maxShardSize = desiredBundleSize, recordCount = 0;
      boolean quoted = false;
      while (data != -1) {
        data = reader.read();
        bytesRead++;
        // This always reads the first char of the column, which can either be the start of the row
        // or the character after the delimiter.
        // If the first char is a quote, we assume the whole value is going to be quoted to keep
        // the behaviour consistent with the CSV parser.
        quoted = ((char) data == quoteChar.get());
        if (!quoted) {
          // If unquoted, we need to reach the next unescaped delimiter or unescaped newline while
          // skipping all characters. Any quotes found will be treated as part of the data.
          prevData = data;
          if ((char) data == columnDelimiter.get()) {
            // The first character of the column was a delimiter.
            continue;
          }
          if ((char) data == '\n') {
            // The first character of the column was a newline.
            recordCount++;
            if (bytesRead > maxShardSize) {
              c.output(
                  FileShard.create(
                      tableName,
                      c.element(),
                      new OffsetRange(prevMarker, prevMarker + bytesRead),
                      recordCount));
              prevMarker += bytesRead;
              bytesRead = 0;
              recordCount = 0;
            }
            continue;
          }
          while (data != -1) {
            data = reader.read();
            bytesRead++;
            // If prev char is escaped, do nothing.
            if ((char) prevData == escape) {
              prevData = data;
              continue;
            }
            prevData = data;
            // We reached end of the column, we go back to the outer loop and move to the next
            // column.
            if ((char) data == columnDelimiter.get()) {
              break;
            }
            // We reached an unescaped EOL, hence the record ends here.
            if ((char) data == '\n') {
              recordCount++;
              if (bytesRead > maxShardSize) {
                c.output(
                    FileShard.create(
                        tableName,
                        c.element(),
                        new OffsetRange(prevMarker, prevMarker + bytesRead),
                        recordCount));
                prevMarker += bytesRead;
                bytesRead = 0;
                recordCount = 0;
              }
              break;
            }
          }
          // This means we reached a column delimiter, newline or end of file, go back to outer
          // loop.
          continue;
        } else {
          // If the first column character is a quote, we assume the whole data value is quoted. If
          // the quote is followed by another quote, the value is still assumed to be quoted
          // (""abc" is quoted "abc, ""abc would throw an error as it would treat it as character
          // after a closed quote).
          // In the quoted case, we know our value ends when we reach an unescaped quote,
          // after which only whitespaces are allowed.  We throw an error if we find any character
          // other  than a whitespace. We ignore any newlines or delimiters inside quotes.
          while (data != -1) {
            prevData = data;
            data = reader.read();
            bytesRead++;
            // CSV quotes can be escaped via the escape character or the quote itself.
            if ((char) data == escape || (char) data == quoteChar.get()) {
              prevData = data;
              data = reader.read();
              bytesRead++;
              if ((char) data == quoteChar.get()) {
                // Character is quote hence ignore since prev was an escape.
                continue;
              }
              // Next character is not a quote. If prev char was a quote, that denotes the value
              // ended.
              if ((char) prevData == quoteChar.get()) {
                // We now skip to the next delimiter or EOL allowing only
                // whitespaces as valid chars from here.
                while (data != -1) {
                  if ((char) data == columnDelimiter.get()) {
                    prevData = data;
                    break;
                  }
                  if ((char) data == '\n') {
                    recordCount++;
                    if (bytesRead > maxShardSize) {
                      c.output(
                          FileShard.create(
                              tableName,
                              c.element(),
                              new OffsetRange(prevMarker, prevMarker + bytesRead),
                              recordCount));
                      prevMarker += bytesRead;
                      bytesRead = 0;
                      recordCount = 0;
                    }
                    break;
                  }
                  if ((char) data != ' ') {
                    throw new RuntimeException("Found char '" + (char) data + "' outside quote");
                  }
                  prevData = data;
                  data = reader.read();
                  bytesRead++;
                }
                break;
              }
            }
          }
        }
      }
      // If the last record does not have a newline at the end, we need to account for that record
      // as well.
      if ((char) prevData != '\n') {
        recordCount++;
      }
      // Add last shard to output collection only if it is non-empty.
      if (recordCount > 0) {
        // We subtract one in the final offset to account for EOF read where data becomes -1.
        c.output(
            FileShard.create(
                tableName,
                c.element(),
                new OffsetRange(prevMarker, prevMarker + bytesRead - 1),
                recordCount));
      }
    } catch (IOException e) {
      throw new RuntimeException("Unable to readFile: " + metadata.resourceId().toString());
    }
  }