in rosbag_cloud_recorders/src/utils/recorder.cpp [110:201]
int Recorder::Run() {
if (options_.trigger) {
DoTrigger();
return 0;
}
if (options_.topics.empty()) {
// Make sure limit is not specified with automatic topic subscription
if (options_.limit > 0) {
fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
return 1;
}
// Make sure topics are specified
if (!options_.record_all && (options_.node == std::string(""))) {
fprintf(stderr, "No topics specified.\n");
return 1;
}
}
if (!nh_.ok())
{
return 0;
}
if (options_.publish)
{
pub_begin_write_ = nh_.advertise<std_msgs::String>("begin_write", 1, true);
}
last_buffer_warn_ = Time();
queue_ = std::make_shared<std::queue<OutgoingMessage>>();
// Subscribe to each topic
if (!options_.regex) {
for (string const& topic : options_.topics) {
shared_ptr<ros::Subscriber> sub = Subscribe(nh_, topic);
currently_recording_.insert(topic);
subscribers_.push_back(sub);
}
}
if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
{
ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
}
ros::Time::waitForValid();
start_time_ = ros::Time::now();
// Don't bother doing anything if we never got a valid time
if (!nh_.ok())
{
return 0;
}
ros::Subscriber trigger_sub;
// Spin up a thread for writing to the file
boost::thread record_thread;
if (options_.snapshot)
{
record_thread = boost::thread(boost::bind(&Recorder::DoRecordSnapshotter, this));
// Subscribe to the snapshot trigger
trigger_sub = nh_.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::SnapshotTrigger, this, _1));
}
else
{
record_thread = boost::thread(boost::bind(&Recorder::DoRecord, this));
}
ros::Timer check_master_timer;
if (options_.record_all || options_.regex || (options_.node != std::string("")))
{
// check for master first
DoCheckMaster(ros::TimerEvent(), nh_);
check_master_timer = nh_.createTimer(ros::Duration(1.0), boost::bind(&Recorder::DoCheckMaster, this, _1, boost::ref(nh_)));
}
ros::AsyncSpinner s(10);
s.start();
record_thread.join();
queue_condition_.notify_all();
check_master_timer.stop();
subscribers_.clear();
currently_recording_.clear();
queue_.reset();
return exit_code_;
}