in lib/cpp/src/thrift/transport/TFileTransport.cpp [300:516]
void TFileTransport::writerThread() {
bool hasIOError = false;
// open file if it is not open
if (!fd_) {
try {
openLogFile();
} catch (...) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
fd_ = 0;
hasIOError = true;
}
}
// set the offset to the correct value (EOF)
if (!hasIOError) {
try {
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
readState_.resetAllValues();
} else {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
hasIOError = true;
}
} catch (...) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
hasIOError = true;
}
}
// Figure out the next time by which a flush must take place
auto ts_next_flush = getNextFlushTime();
uint32_t unflushed = 0;
while (1) {
// this will only be true when the destructor is being invoked
if (closing_) {
if (hasIOError) {
return;
}
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
::THRIFT_FSYNC(fd_);
if (-1 == ::THRIFT_CLOSE(fd_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
// fd successfully closed
fd_ = 0;
}
return;
}
}
if (swapEventBuffers(&ts_next_flush)) {
eventInfo* outEvent;
while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
// Remove an event from the buffer and write it out to disk. If there is any IO error, for
// instance,
// the output file is unmounted or deleted, then this event is dropped. However, the writer
// thread
// will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
// start writing
// from the end.
while (hasIOError) {
T_ERROR(
"TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
writerThreadIOErrorSleepTime_);
THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
if (closing_) {
return;
}
if (!fd_) {
::THRIFT_CLOSE(fd_);
fd_ = 0;
}
try {
openLogFile();
seekToEnd();
unflushed = 0;
hasIOError = false;
T_LOG_OPER(
"TFileTransport: log file %s reopened by writer thread during error recovery",
filename_.c_str());
} catch (...) {
T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
filename_.c_str());
}
}
// sanity check on event
if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
T_ERROR("msg size is greater than max event size: %u > %u\n",
outEvent->eventSize_,
maxEventSize_);
continue;
}
// If chunking is required, then make sure that msg does not cross chunk boundary
if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
// event size must be less than chunk size
if (outEvent->eventSize_ > chunkSize_) {
T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
outEvent->eventSize_,
chunkSize_);
continue;
}
int64_t chunk1 = offset_ / chunkSize_;
int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
// if adding this event will cross a chunk boundary, pad the chunk with zeros
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
auto* zeros = new uint8_t[padding];
memset(zeros, '\0', padding);
std::unique_ptr<uint8_t[]> array(zeros);
if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
errno_copy);
hasIOError = true;
continue;
}
unflushed += padding;
offset_ += padding;
}
}
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
hasIOError = true;
continue;
}
unflushed += outEvent->eventSize_;
offset_ += outEvent->eventSize_;
}
}
dequeueBuffer_->reset();
}
if (hasIOError) {
continue;
}
// Local variable to cache the state of forceFlush_.
//
// We only want to check the value of forceFlush_ once each time around the
// loop. If we check it more than once without holding the lock the entire
// time, it could have changed state in between. This will result in us
// making inconsistent decisions.
bool forced_flush = false;
{
Guard g(mutex_);
if (forceFlush_) {
if (!enqueueBuffer_->isEmpty()) {
// If forceFlush_ is true, we need to flush all available data.
// If enqueueBuffer_ is not empty, go back to the start of the loop to
// write it out.
//
// We know the main thread is waiting on forceFlush_ to be cleared,
// so no new events will be added to enqueueBuffer_ until we clear
// forceFlush_. Therefore the next time around the loop enqueueBuffer_
// is guaranteed to be empty. (I.e., we're guaranteed to make progress
// and clear forceFlush_ the next time around the loop.)
continue;
}
forced_flush = true;
}
}
// determine if we need to perform an fsync
bool flush = false;
if (forced_flush || unflushed > flushMaxBytes_) {
flush = true;
} else {
if (std::chrono::steady_clock::now() > ts_next_flush) {
if (unflushed > 0) {
flush = true;
} else {
// If there is no new data since the last fsync,
// don't perform the fsync, but do reset the timer.
ts_next_flush = getNextFlushTime();
}
}
}
if (flush) {
// sync (force flush) file to disk
THRIFT_FSYNC(fd_);
unflushed = 0;
ts_next_flush = getNextFlushTime();
// notify anybody waiting for flush completion
if (forced_flush) {
Guard g(mutex_);
forceFlush_ = false;
assert(enqueueBuffer_->isEmpty());
assert(dequeueBuffer_->isEmpty());
flushed_.notifyAll();
}
}
}
}