protected void writeImpl()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java [231:313]


  protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile,
      FileAwareInputStream record) throws IOException {

    final short replication = this.state.getPropAsShort(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR,
        copyableFile.getReplication(this.fs));
    final long blockSize = copyableFile.getBlockSize(this.fs);
    final long fileSize = copyableFile.getFileStatus().getLen();

    long expectedBytes = fileSize;
    Long maxBytes = null;
    // Whether writer must write EXACTLY maxBytes.
    boolean mustMatchMaxBytes = false;

    if (record.getSplit().isPresent()) {
      maxBytes = record.getSplit().get().getHighPosition() - record.getSplit().get().getLowPosition();
      if (record.getSplit().get().isLastSplit()) {
        expectedBytes = fileSize % blockSize;
        mustMatchMaxBytes = false;
      } else {
        expectedBytes = maxBytes;
        mustMatchMaxBytes = true;
      }
    }

    Predicate<FileStatus> fileStatusAttributesFilter = new Predicate<FileStatus>() {
      @Override
      public boolean apply(FileStatus input) {
        return input.getReplication() == replication && input.getBlockSize() == blockSize;
      }
    };
    Optional<FileStatus> persistedFile =
        this.recoveryHelper.findPersistedFile(this.state, copyableFile, fileStatusAttributesFilter);

    if (persistedFile.isPresent()) {
      log.info(String.format("Recovering persisted file %s to %s.", persistedFile.get().getPath(), writeAt));
      this.fs.rename(persistedFile.get().getPath(), writeAt);
    } else {
      // Copy empty directories
      if (copyableFile.getFileStatus().isDirectory()) {
        this.fs.mkdirs(writeAt);
        return;
      }

      OutputStream os =
          this.fs.create(writeAt, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
      if (encryptionConfig != null) {
        os = EncryptionFactory.buildStreamCryptoProvider(encryptionConfig).encodeOutputStream(os);
      }
      try {
        FileSystem defaultFS = FileSystem.get(new Configuration());
        StreamThrottler<GobblinScopeTypes> throttler =
            this.taskBroker.getSharedResource(new StreamThrottler.Factory<GobblinScopeTypes>(), new EmptyKey());
        ThrottledInputStream throttledInputStream = throttler.throttleInputStream().inputStream(inputStream)
            .sourceURI(copyableFile.getOrigin().getPath().makeQualified(defaultFS.getUri(), defaultFS.getWorkingDirectory()).toUri())
            .targetURI(this.fs.makeQualified(writeAt).toUri()).build();
        StreamCopier copier = new StreamCopier(throttledInputStream, os, maxBytes).withBufferSize(this.bufferSize);

        log.info("File {}: Starting copy", copyableFile.getOrigin().getPath());

        if (isInstrumentationEnabled()) {
          copier.withCopySpeedMeter(this.copySpeedMeter);
        }
        long numBytes = copier.copy();
        if ((this.checkFileSize || mustMatchMaxBytes) && numBytes != expectedBytes) {
          throw new IOException(String.format("Incomplete write: expected %d, wrote %d bytes.",
              expectedBytes, numBytes));
        }
        this.bytesWritten.addAndGet(numBytes);
        if (isInstrumentationEnabled()) {
          log.info("File {}: copied {} bytes, average rate: {} B/s", copyableFile.getOrigin().getPath(),
              this.copySpeedMeter.getCount(), this.copySpeedMeter.getMeanRate());
        } else {
          log.info("File {} copied.", copyableFile.getOrigin().getPath());
        }
      } catch (NotConfiguredException nce) {
        log.warn("Broker error. Some features of stream copier may not be available.", nce);
      } finally {
        os.close();
        log.info("OutputStream for file {} is closed.", writeAt);
        inputStream.close();
      }
    }
  }