in rosbag_cloud_recorders/src/utils/recorder.cpp [469:555]
void Recorder::DoRecord() {
// Open bag file for writing
StartWriting();
// Schedule the disk space check
warn_next_ = ros::WallTime();
try
{
CheckDisk();
}
catch (const rosbag::BagException& ex)
{
ROS_ERROR_STREAM(ex.what());
exit_code_ = 1;
StopWriting();
return;
}
check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
// Technically the queue_mutex_ should be locked while checking empty.
// Except it should only get checked if the node is not ok, and thus
// it shouldn't be in contention.
ros::NodeHandle nh;
while (nh.ok() || !queue_->empty()) {
boost::unique_lock<boost::mutex> lock(queue_mutex_);
bool finished = false;
while (queue_->empty()) {
if (!nh.ok()) {
lock.release()->unlock();
finished = true;
break;
}
boost::xtime xt {};
#if BOOST_VERSION >= 105000
boost::xtime_get(&xt, boost::TIME_UTC_);
#else
boost::xtime_get(&xt, boost::TIME_UTC);
#endif
xt.nsec += 250000000;
queue_condition_.timed_wait(lock, xt);
if (CheckDuration(ros::Time::now()))
{
finished = true;
break;
}
}
if (finished)
{
break;
}
OutgoingMessage out = queue_->front();
queue_->pop();
queue_size_ -= out.msg->size();
lock.release()->unlock();
if (CheckSize())
{
break;
}
if (CheckDuration(out.time))
{
break;
}
try
{
if (ScheduledCheckDisk() && CheckLogging())
{
bag_.write(out.topic, out.time, *out.msg, out.connection_header);
}
}
catch (const rosbag::BagException& ex)
{
ROS_ERROR_STREAM(ex.what());
exit_code_ = 1;
break;
}
}
StopWriting();
}