void TFileTransport::writerThread()

in lib/cpp/src/thrift/transport/TFileTransport.cpp [300:516]


void TFileTransport::writerThread() {
  bool hasIOError = false;

  // open file if it is not open
  if (!fd_) {
    try {
      openLogFile();
    } catch (...) {
      int errno_copy = THRIFT_ERRNO;
      GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
      fd_ = 0;
      hasIOError = true;
    }
  }

  // set the offset to the correct value (EOF)
  if (!hasIOError) {
    try {
      seekToEnd();
      // throw away any partial events
      offset_ += readState_.lastDispatchPtr_;
      if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
        readState_.resetAllValues();
      } else {
        int errno_copy = THRIFT_ERRNO;
        GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
        hasIOError = true;
      }
    } catch (...) {
      int errno_copy = THRIFT_ERRNO;
      GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
      hasIOError = true;
    }
  }

  // Figure out the next time by which a flush must take place
  auto ts_next_flush = getNextFlushTime();
  uint32_t unflushed = 0;

  while (1) {
    // this will only be true when the destructor is being invoked
    if (closing_) {
      if (hasIOError) {
        return;
      }

      // Try to empty buffers before exit
      if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
        ::THRIFT_FSYNC(fd_);
        if (-1 == ::THRIFT_CLOSE(fd_)) {
          int errno_copy = THRIFT_ERRNO;
          GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
        } else {
          // fd successfully closed
          fd_ = 0;
        }
        return;
      }
    }

    if (swapEventBuffers(&ts_next_flush)) {
      eventInfo* outEvent;
      while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
        // Remove an event from the buffer and write it out to disk. If there is any IO error, for
        // instance,
        // the output file is unmounted or deleted, then this event is dropped. However, the writer
        // thread
        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
        // start writing
        // from the end.

        while (hasIOError) {
          T_ERROR(
              "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
              writerThreadIOErrorSleepTime_);
          THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
          if (closing_) {
            return;
          }
          if (!fd_) {
            ::THRIFT_CLOSE(fd_);
            fd_ = 0;
          }
          try {
            openLogFile();
            seekToEnd();
            unflushed = 0;
            hasIOError = false;
            T_LOG_OPER(
                "TFileTransport: log file %s reopened by writer thread during error recovery",
                filename_.c_str());
          } catch (...) {
            T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
                    filename_.c_str());
          }
        }

        // sanity check on event
        if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
          T_ERROR("msg size is greater than max event size: %u > %u\n",
                  outEvent->eventSize_,
                  maxEventSize_);
          continue;
        }

        // If chunking is required, then make sure that msg does not cross chunk boundary
        if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
          // event size must be less than chunk size
          if (outEvent->eventSize_ > chunkSize_) {
            T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
                    outEvent->eventSize_,
                    chunkSize_);
            continue;
          }

          int64_t chunk1 = offset_ / chunkSize_;
          int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;

          // if adding this event will cross a chunk boundary, pad the chunk with zeros
          if (chunk1 != chunk2) {
            // refetch the offset to keep in sync
            offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
            auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);

            auto* zeros = new uint8_t[padding];
            memset(zeros, '\0', padding);
            std::unique_ptr<uint8_t[]> array(zeros);
            if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
              int errno_copy = THRIFT_ERRNO;
              GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
                                  errno_copy);
              hasIOError = true;
              continue;
            }
            unflushed += padding;
            offset_ += padding;
          }
        }

        // write the dequeued event to the file
        if (outEvent->eventSize_ > 0) {
          if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
            int errno_copy = THRIFT_ERRNO;
            GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
            hasIOError = true;
            continue;
          }
          unflushed += outEvent->eventSize_;
          offset_ += outEvent->eventSize_;
        }
      }
      dequeueBuffer_->reset();
    }

    if (hasIOError) {
      continue;
    }

    // Local variable to cache the state of forceFlush_.
    //
    // We only want to check the value of forceFlush_ once each time around the
    // loop.  If we check it more than once without holding the lock the entire
    // time, it could have changed state in between.  This will result in us
    // making inconsistent decisions.
    bool forced_flush = false;
    {
      Guard g(mutex_);
      if (forceFlush_) {
        if (!enqueueBuffer_->isEmpty()) {
          // If forceFlush_ is true, we need to flush all available data.
          // If enqueueBuffer_ is not empty, go back to the start of the loop to
          // write it out.
          //
          // We know the main thread is waiting on forceFlush_ to be cleared,
          // so no new events will be added to enqueueBuffer_ until we clear
          // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
          // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
          // and clear forceFlush_ the next time around the loop.)
          continue;
        }
        forced_flush = true;
      }
    }

    // determine if we need to perform an fsync
    bool flush = false;
    if (forced_flush || unflushed > flushMaxBytes_) {
      flush = true;
    } else {
      if (std::chrono::steady_clock::now() > ts_next_flush) {
        if (unflushed > 0) {
          flush = true;
        } else {
          // If there is no new data since the last fsync,
          // don't perform the fsync, but do reset the timer.
          ts_next_flush = getNextFlushTime();
        }
      }
    }

    if (flush) {
      // sync (force flush) file to disk
      THRIFT_FSYNC(fd_);
      unflushed = 0;
      ts_next_flush = getNextFlushTime();

      // notify anybody waiting for flush completion
      if (forced_flush) {
        Guard g(mutex_);
        forceFlush_ = false;
        assert(enqueueBuffer_->isEmpty());
        assert(dequeueBuffer_->isEmpty());
        flushed_.notifyAll();
      }
    }
  }
}