integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java [101:150]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private volatile boolean flushed;

  @Override
  public String getPath() {
    return this.writePath;
  }

  @Override
  public void addElement(final Object[] element) throws IOException {
    this.writerFactory.getWriter(element).write(element);
    this.writeCount.incrementAndGet();
    if (this.writeCount.get() >= this.writeCommitThreshold) {
      this.closeWriters();
      this.commit();
      this.writerFactory.reset();
      this.writeCount.set(0);
    }
    this.flushed = false;
  }

  @Override
  public void flush() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Flush writer. " + this.toString());
    }
    synchronized (this) {
      if (!this.flushed) {
        this.closeWriters();
        this.commit();
        this.writerFactory.reset();
        this.writeCount.set(0);
        this.flushed = true;
      }
    }
  }

  @Override
  public void finish() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Finish writer. " + this.toString());
    }
    if (!this.flushed) {
      this.flush();
    }
  }

  @Override
  public void commit() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Commit write. " + this.toString());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java [108:157]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private volatile boolean flushed;

  @Override
  public String getPath() {
    return this.writePath;
  }

  @Override
  public void addElement(final Object[] element) throws IOException {
    this.writerFactory.getWriter(element).write(element);
    this.writeCount.incrementAndGet();
    if (this.writeCount.get() >= this.writeCommitThreshold) {
      this.closeWriters();
      this.commit();
      this.writerFactory.reset();
      this.writeCount.set(0);
    }
    this.flushed = false;
  }

  @Override
  public void flush() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Flush writer. " + this.toString());
    }
    synchronized (this) {
      if (!this.flushed) {
        this.closeWriters();
        this.commit();
        this.writerFactory.reset();
        this.writeCount.set(0);
        this.flushed = true;
      }
    }
  }

  @Override
  public void finish() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Finish writer. " + this.toString());
    }
    if (!this.flushed) {
      this.flush();
    }
  }

  @Override
  public void commit() throws IOException {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Commit write. " + this.toString());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



