sub process_file()

in src/kudu/scripts/kudu-log-parser.pl [139:287]


sub process_file($$$) {
  defined(my $file = shift) or die;
  defined(my $opts = shift) or die;
  defined(my $records = shift) or die;

  # Support gzipped files.
  if ($file =~ $pat_gzip_ext) {
    open(IN, "gunzip -c $file |") or die "can't open gunzip pipe to $file: $!";
  } else {
    open(IN, "< $file") or die "can't open $file: $!";
  }

  print STDERR "processing file $file...\n";

  my $min_ts_micros = 0;
  my $max_ts_micros = 2**64 - 1;

  if (exists $opts->{before}) {
    $max_ts_micros = timestamp_string_to_micros($opts->{before});
  }
  if (exists $opts->{after}) {
    $min_ts_micros = timestamp_string_to_micros($opts->{after});
  }

  my $line_count = 0;
  while (defined(my $line = <IN>)) {
    $line_count++;
    # Skip non-glog lines.
    next unless $line =~ $pat_glog_prefix;
    my ($date, $ts) = ($1, $2);
    my $ts_micros = glog_timestamp_to_micros($date, $ts);
    next unless $ts_micros >= $min_ts_micros;
    next unless $ts_micros <= $max_ts_micros;

    chomp $line;

    if (exists $opts->{nosummary}) {
      if (exists $opts->{tablet}) {
        next unless (my @m = $line =~ $pat_tablet_prefix);
        my ($date, $ts, $tablet_id, $local_peer_uuid) = @m;
        next unless $tablet_id eq $opts->{tablet};
      }
      push @$records, {ts => $ts_micros, type => RAW_LOG_LINE(),
                       data => {line => $line}};
      next;
    }

    if (my @m = $line =~ $pat_raft_hb_timeout) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $remote_peer_uuid, $remote_peer_host, $timeout_sec) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => RAFT_UPDT_TIMEOUT(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               remote_peer_uuid => $remote_peer_uuid}};
      next;
    }

    if (my @m = $line =~ $pat_raft_start_election) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $local_peer_role, $election_type, $reason) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => ELECTION_STARTING(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term, local_peer_role => $local_peer_role,
                               election_type => $election_type, reason => $reason}};
      next;
    }

    if (my @m = $line =~ $pat_raft_election_result) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $election_type, $result) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => ELECTION_RESULT(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term, election_type => $election_type,
                               result => $result}};
      next;
    }

    if (my @m = $line =~ $pat_raft_election_timeout) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $election_type, $remote_peer_uuid, $error) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => VOTE_REQ_TIMEOUT(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term,  election_type => $election_type,
                               remote_peer_uuid => $remote_peer_uuid,
                               error => $error}};
      next;
    }

    if (my @m = $line =~ $pat_raft_becoming_leader) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $local_peer_role) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => BECOMING_LEADER(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term, local_peer_role => $local_peer_role}};
      next;
    }

    if (my @m = $line =~ $pat_raft_deny_vote) {
      next; # We don't really care about denied votes since they are not very expensive
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $local_peer_role,
          $election_type, $remote_peer_uuid, $remote_peer_term, $reason) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => VOTE(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term, local_role => $local_peer_role,
                               election_type => $election_type, remote_peer_uuid => $remote_peer_uuid,
                               remote_peer_term => $remote_peer_term, reason => $reason, result => "DENIED"}};
      next;
    }

    if (my @m = $line =~ $pat_raft_grant_vote) {
      my ($date, $ts, $tablet_id, $local_peer_uuid, $local_peer_term, $local_peer_role, $election_type,
          $remote_peer_uuid, $remote_peer_term) = @m;
      next if exists $opts->{tablet} && $tablet_id ne $opts->{tablet};
      push @$records, {ts => $ts_micros, type => VOTE(),
                       data => {tablet_id => $tablet_id, local_peer_uuid => $local_peer_uuid,
                               local_peer_term => $local_peer_term, local_role => $local_peer_role,
                               election_type => $election_type, remote_peer_uuid => $remote_peer_uuid,
                               remote_peer_term => $remote_peer_term, reason => "", result => "GRANTED"}};
      next;

    }

    if (my @m = $line =~ $pat_blocked_reactor) {
      my ($date, $ts, $rpc, $remote_addr, $duration_str) = @m;
      my $duration_micros = duration_to_micros($duration_str);
      push @$records, {ts => $ts_micros, type => BLOCKED_REACTOR(),
                       data => {rpc => $rpc, remote_addr => $remote_addr, duration_micros => $duration_micros}};
      next;
    }

    if (my @m = $line =~ $pat_kernel_stack_watchdog) {
      my ($date, $ts, $thread, $file, $line, $duration_str) = @m;
      my $duration_micros = duration_to_micros($duration_str);
      push @$records, {ts => $ts_micros, type => KERNEL_DELAY(),
                       data => {thread => $thread, file => $file, line => $line, duration_micros => $duration_micros}};
      next;
    }

    if (my @m = $line =~ $pat_slow_execution) {
      my ($date, $ts, $task, $duration_str) = @m;
      my $duration_micros = duration_to_micros($duration_str);
      push @$records, {ts => $ts_micros, type => SLOW_EXECUTION(),
                       data => {task => $task, duration_micros => $duration_micros}};
      next;
    }
  }

  return $line_count;
}