Exit_status process_event()

in client/mysqlbinlog.cc [1090:1970]


Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
                          my_off_t pos, const char *logname)
{
  char ll_buff[21];
  Log_event_type ev_type= ev->get_type_code();
  my_bool destroy_evt= TRUE;
  DBUG_ENTER("process_event");
  print_event_info->short_form= short_form;
  Exit_status retval= OK_CONTINUE;
  IO_CACHE *const head= &print_event_info->head_cache;
  static Gtid cached_gtid;

  /*
    Format events are not concerned by --offset and such, we always need to
    read them to be able to process the wanted events.
  */
  if (((rec_count >= offset) &&
       ((my_time_t) (ev->when.tv_sec) >= start_datetime)) ||
      (ev_type == FORMAT_DESCRIPTION_EVENT))
  {
    if (ev_type != FORMAT_DESCRIPTION_EVENT)
    {
      /*
        We have found an event after start_datetime, from now on print
        everything (in case the binlog has timestamps increasing and
        decreasing, we do this to avoid cutting the middle).
      */
      start_datetime= 0;
      offset= 0; // print everything and protect against cycling rec_count
      /*
        Skip events according to the --server-id flag.  However, don't
        skip format_description or rotate events, because they they
        are really "global" events that are relevant for the entire
        binlog, even if they have a server_id.  Also, we have to read
        the format_description event so that we can parse subsequent
        events.
      */
      if (ev_type != ROTATE_EVENT &&
          filter_server_id && (filter_server_id != ev->server_id))
        goto end;
    }
    if (((my_time_t) (ev->when.tv_sec) >= stop_datetime)
        || (pos >= stop_position_mot) || shall_stop_gtids(ev))
    {
      /* end the program */
      retval= OK_STOP;
      goto end;
    }
    if (!short_form)
      my_b_printf(&print_event_info->head_cache,
                  "# at %s\n",llstr(pos,ll_buff));

    if (!opt_hexdump)
      print_event_info->hexdump_from= 0; /* Disabled */
    else
      print_event_info->hexdump_from= pos;

    print_event_info->base64_output_mode= opt_base64_output_mode;

    DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str()));

    if (shall_skip_gtids(ev, &cached_gtid))
      goto end;

    switch (ev_type) {
    case QUERY_EVENT:
    {
      bool const skip_thread= filter_thread_id &&
          (filter_thread_id !=
           static_cast<Query_log_event const &>(*ev).thread_id);
      bool parent_query_skips=
          !((Query_log_event*) ev)->is_trans_keyword() &&
           shall_skip_database(((Query_log_event*) ev)->db);
      bool ends_group= ((Query_log_event*) ev)->ends_group();
      bool starts_group= ((Query_log_event*) ev)->starts_group();

      if (!starts_group &&
          ev_database_changed(string(((Query_log_event*) ev)->db),
                              print_event_info) != Check_database_decision::OK)
      {
        goto err;
      }

      for (uint i= 0; i < buff_ev.elements; i++) 
      {
        buff_event_info pop_event_array= *dynamic_element(&buff_ev, i, buff_event_info *);
        Log_event *temp_event= pop_event_array.event;
        my_off_t temp_log_pos= pop_event_array.event_pos;
        print_event_info->hexdump_from= (opt_hexdump ? temp_log_pos : 0); 
        if (!parent_query_skips && !skip_thread)
          temp_event->print(result_file, print_event_info);
        delete temp_event;
      }
      
      // dbug_case: rows_query event comes in SBR only when
      // binlog_trx_meta_data is enabled and it comes just before the real
      // query event (i.e. not a trx keyword like BEGIN, COMMIT etc.)
      DBUG_ASSERT(!last_rows_query_event.event ||
                  (static_cast<Rows_query_log_event*>(
                       last_rows_query_event.event)->has_trx_meta_data() &&
                   !((Query_log_event*) ev)->is_trans_keyword()));

      // when binlog_trx_meta_data is enabled we will get rows_query event
      // before query events
      handle_last_rows_query_event(!opt_skip_rows_query &&
                                   !parent_query_skips &&
                                   !skip_thread,
                                   print_event_info);

      print_event_info->hexdump_from= (opt_hexdump ? pos : 0);
      reset_dynamic(&buff_ev);

      if (skip_thread)
      {
        goto end;
      }

      if (parent_query_skips)
      {
        /*
          Even though there would be no need to set the flag here,
          since parent_query_skips is never true when handling "COMMIT"
          statements in the Query_log_event, we still need to handle DDL,
          which causes a commit itself.
        */

        if (seen_gtids && !in_transaction && !starts_group && !ends_group)
        {
          /*
            For DDLs, print the COMMIT right away. 
          */
          fprintf(result_file, "COMMIT /* added by mysqlbinlog */%s\n", print_event_info->delimiter);
          print_event_info->skipped_event_in_transaction= false;
          in_transaction= false;
        }
        else
          print_event_info->skipped_event_in_transaction= true;
        goto end;
      }

      if (ends_group)
      {
        in_transaction= false;
        print_event_info->skipped_event_in_transaction= false;
        if (print_event_info->is_gtid_next_set)
          print_event_info->is_gtid_next_valid= false;
        /*
         * Skip the COMMIT/ROLLBACK event of the extra databases
         * when the option --skip-empty-trans is enabled. We also clear the
         * cur_database at the end of currrent transaction
         */
        if (opt_skip_empty_trans)
        {
          bool skip= shall_skip_database(cur_database.c_str());
          // Turn off the flag at end of transaction
          empty_begin_query_ev = false;
          // Delete and reset the cache pointer
          if (begin_query_ev_cache)
          {
            delete begin_query_ev_cache;
            begin_query_ev_cache = nullptr;
          }
          // Reset the database tracking as well
          cur_database= "";
          if (skip)
            break;
        }

        if (opt_print_gtids && encounter_gtid(cached_gtid))
            goto err;
      }
      else if (starts_group)
      {
        in_transaction= true;

        if (opt_skip_empty_trans)
        {
          if (cur_database != "")
          {
            error("The database used from the previous transaction has not "
                  "been cleared. This probably means that the previous "
                  "transaction has not hit COMMIT/ROLLBACK yet.");
            goto err;
          }
          /*
           * cur_database is always assigned at the BEGIN of a transaction and
           * cleared at the COMMIT/ROLLBACK of a transaction.
           */
          cur_database= string(((Query_log_event*) ev)->db);

          if (cur_database.empty()) {
            // Mark the flag
            empty_begin_query_ev = true;
            // The cache should definitely be cleared
            assert(begin_query_ev_cache == nullptr);
            // Cache the new BEGIN event
            begin_query_ev_cache = ev;
            // Do not destroy, we will handle the detroy ourselves
            destroy_evt = FALSE;
            break;
          }
          /*
           * skip the BEGIN query of the extra databases when the option
           * --skip-empty-trans is enabled
           */
          if (shall_skip_database(cur_database.c_str()))
            goto end;
        }
      }
      else
      {
        /*
          We are not in a transaction and are not seeing a BEGIN or
          COMMIT. So this is an implicitly committing DDL.
         */
        if (print_event_info->is_gtid_next_set && !in_transaction)
          print_event_info->is_gtid_next_valid= false;
      }

      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
      break;
      
      destroy_evt= TRUE;
    }
          
    case INTVAR_EVENT:
    {
      destroy_evt= FALSE;
      buff_event.event= ev;
      buff_event.event_pos= pos;
      insert_dynamic(&buff_ev, (uchar*) &buff_event);
      break;
    }
    	
    case RAND_EVENT:
    {
      destroy_evt= FALSE;
      buff_event.event= ev;
      buff_event.event_pos= pos;      
      insert_dynamic(&buff_ev, (uchar*) &buff_event);
      break;
    }
    
    case USER_VAR_EVENT:
    {
      destroy_evt= FALSE;
      buff_event.event= ev;
      buff_event.event_pos= pos;      
      insert_dynamic(&buff_ev, (uchar*) &buff_event);
      break; 
    }


    case CREATE_FILE_EVENT:
    {
      Create_file_log_event* ce= (Create_file_log_event*)ev;

      if (ev_database_changed(string(ce->db), print_event_info)
          != Check_database_decision::OK)
      {
        goto err;
      }

      /*
        We test if this event has to be ignored. If yes, we don't save
        this event; this will have the good side-effect of ignoring all
        related Append_block and Exec_load.
        Note that Load event from 3.23 is not tested.
      */
      if (shall_skip_database(ce->db))
      {
        print_event_info->skipped_event_in_transaction= true;
        goto end;                // Next event
      }
      /*
	We print the event, but with a leading '#': this is just to inform 
	the user of the original command; the command we want to execute 
	will be a derivation of this original command (we will change the 
	filename and use LOCAL), prepared in the 'case EXEC_LOAD_EVENT' 
	below.
      */
      {
      ce->print(result_file, print_event_info, TRUE);
        if (head->error == -1)
          goto err;
      }
      // If this binlog is not 3.23 ; why this test??
      if (glob_description_event->binlog_version >= 3)
      {
        /*
          transfer the responsibility for destroying the event to
          load_processor
        */
        ev= NULL;
        if ((retval= load_processor.process(ce)) != OK_CONTINUE)
          goto end;
      }
      break;
    }

    case APPEND_BLOCK_EVENT:
      /*
        Append_block_log_events can safely print themselves even if
        the subsequent call load_processor.process fails, because the
        output of Append_block_log_event::print is only a comment.
      */
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
      if ((retval= load_processor.process((Append_block_log_event*) ev)) !=
          OK_CONTINUE)
        goto end;
      break;

    case EXEC_LOAD_EVENT:
    {
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
      Execute_load_log_event *exv= (Execute_load_log_event*)ev;
      Create_file_log_event *ce= load_processor.grab_event(exv->file_id);
      /*
	if ce is 0, it probably means that we have not seen the Create_file
	event (a bad binlog, or most probably --start-position is after the
	Create_file event). Print a warning comment.
      */
      if (ce)
      {
        /*
          We must not convert earlier, since the file is used by
          my_open() in Load_log_processor::append().
        */
        convert_path_to_forward_slashes((char*) ce->fname);
	ce->print(result_file, print_event_info, TRUE);
	my_free((void*)ce->fname);
	delete ce;
        if (head->error == -1)
          goto err;
      }
      else
        warning("Ignoring Execute_load_log_event as there is no "
                "Create_file event for file_id: %u", exv->file_id);
      break;
    }
    case FORMAT_DESCRIPTION_EVENT:
      delete glob_description_event;
      glob_description_event= (Format_description_log_event*) ev;
      /*
        The first FD event in log is always generated
        from the local server. So if it is first FD event to be
        processed (i.e., if server_id_from_fd_event is 0),
        get server_id from the FD event and keep it in
        server_id_from_fd_event to differentiate between FDs
        (originated from local server vs another server).
       */
      if (print_event_info->server_id_from_fd_event == 0)
        print_event_info->server_id_from_fd_event= ev->server_id;

      print_event_info->common_header_len=
        glob_description_event->common_header_len;
      ev->print(result_file, print_event_info);
      /*
        At this point, if we are in transaction that means
        we are reading a relay log file (transaction cannot
        spawn across two binary log files, they are writen
        at once in binlog). When AUTO_POSITION is enabled
        and if IO thread stopped in between the GTID transaction,
        upon IO thread restart, Master will send the GTID events
        again from the begin of the transaction. Hence, we should
        rollback the old transaction.

        If you are reading FD event that came from Master
        (first FD event is from the server that owns the relaylog
        and second one is from Master) and if it's log_pos is > 0
        then it represents the begin of a master's binary log
        (any unfinished transaction will not be finished) or that
        auto_position is enabled (any partial transaction left will
        not be finished but will be fully retrieved again). On both
        cases, the next transaction in the relay log will start from the
        beginning and we must rollback any unfinished transaction
      */
      if (ev->server_id !=0 &&
          ev->server_id != print_event_info->server_id_from_fd_event &&
          ev->log_pos > 0)
      {
        if (in_transaction)
        {
          my_b_printf(&print_event_info->head_cache,
                      "ROLLBACK /* added by mysqlbinlog */ %s\n",
                      print_event_info->delimiter);
        }
        else if (print_event_info->is_gtid_next_set &&
                 print_event_info->is_gtid_next_valid)
        {
          /*
            If we are here, then we have seen only GTID_LOG_EVENT
            of a transaction and did not see even a BEGIN event
            (in_transaction flag is false). So generate BEGIN event
            also along with ROLLBACK event.
          */
          my_b_printf(&print_event_info->head_cache,
                      "BEGIN /*added by mysqlbinlog */ %s\n"
                      "ROLLBACK /* added by mysqlbinlog */ %s\n",
                      print_event_info->delimiter,
                      print_event_info->delimiter);
        }
      }
      if (head->error == -1)
        goto err;
      if (opt_remote_proto == BINLOG_LOCAL)
      {
        ev->free_temp_buf(); // free memory allocated in dump_local_log_entries
      }
      else
      {
        /*
          disassociate but not free dump_remote_log_entries time memory
        */
        ev->temp_buf= 0;
      }
      /*
        We don't want this event to be deleted now, so let's hide it (I
        (Guilhem) should later see if this triggers a non-serious Valgrind
        error). Not serious error, because we will free description_event
        later.
      */
      ev= 0;
      if (!force_if_open_opt &&
          (glob_description_event->flags & LOG_EVENT_BINLOG_IN_USE_F))
      {
        error("Attempting to dump binlog '%s', which was not closed properly. "
              "Most probably, mysqld is still writing it, or it crashed. "
              "Rerun with --force-if-open to ignore this problem.", logname);
        DBUG_RETURN(ERROR_STOP);
      }
      break;
    case BEGIN_LOAD_QUERY_EVENT:
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
      if ((retval= load_processor.process((Begin_load_query_log_event*) ev)) !=
          OK_CONTINUE)
        goto end;
      break;
    case EXECUTE_LOAD_QUERY_EVENT:
    {
      Execute_load_query_log_event *exlq= (Execute_load_query_log_event*)ev;
      char *fname= load_processor.grab_fname(exlq->file_id);

      if (ev_database_changed(string(exlq->db), print_event_info)
          != Check_database_decision::OK)
      {
        goto err;
      }

      if (shall_skip_database(exlq->db))
        print_event_info->skipped_event_in_transaction= true;
      else
      {
        if (fname)
        {
          convert_path_to_forward_slashes(fname);
          exlq->print(result_file, print_event_info, fname);
          if (head->error == -1)
          {
            if (fname)
              my_free(fname);
            goto err;
          }
        }
        else
          warning("Ignoring Execute_load_query since there is no "
                  "Begin_load_query event for file_id: %u", exlq->file_id);
      }

      if (fname)
	my_free(fname);
      break;
    }
    case ROWS_QUERY_LOG_EVENT:
    {
      if (last_rows_query_event.event || last_rows_query_event_temp_buf)
      {
        error("Found a buffered rows_query event while processing one. This "
              "might mean that the binlog contains consecutive rows_query "
              "events. We expect a table_map or query event after rows_query.");
        goto err;
      }
      // case: we can have rows_query event containing trx metadata in SBR, to
      // avoid unflushed event warning we'll skip setting this flag when the
      // rows_query event contains trx metadata
      auto rq= static_cast<Rows_query_log_event*>(ev);
      print_event_info->have_unflushed_events= !rq->has_trx_meta_data();
      destroy_evt= FALSE;
      last_rows_query_event.event= rq;
      last_rows_query_event.event_pos= pos;
      if (ev->temp_buf)
      {
        last_rows_query_event_temp_buf=
          static_cast<char*>(my_malloc(ev->data_written, MYF(MY_WME)));
        memcpy(last_rows_query_event_temp_buf, ev->temp_buf, ev->data_written);
      }
      else
      {
        last_rows_query_event_temp_buf= nullptr;
      }

      break;
    }
    case TABLE_MAP_EVENT:
    {
      Table_map_log_event *map= ((Table_map_log_event *)ev);

      if (ev_database_changed(string(map->get_db_name()), print_event_info)
          != Check_database_decision::OK)
      {
        goto err;
      }

      if (shall_skip_table(map->get_db_name(), map->get_table_name()))
      {
        print_event_info->skipped_event_in_transaction= true;
        print_event_info->m_table_map_ignored.set_table(map->get_table_id(), map);
        destroy_evt= FALSE;
        // case: this event is skipped so clean up the buffered rows_query
        handle_last_rows_query_event(false, print_event_info);
        goto end;
      }

      // case: this event was not skipped, so let's print the buffered
      // rows_query
      handle_last_rows_query_event(!opt_skip_rows_query, print_event_info);
      /* fall through */
    }
    case WRITE_ROWS_EVENT:
    case DELETE_ROWS_EVENT:
    case UPDATE_ROWS_EVENT:
    case WRITE_ROWS_EVENT_V1:
    case UPDATE_ROWS_EVENT_V1:
    case DELETE_ROWS_EVENT_V1:
    case PRE_GA_WRITE_ROWS_EVENT:
    case PRE_GA_DELETE_ROWS_EVENT:
    case PRE_GA_UPDATE_ROWS_EVENT:
    {
      // dbug_case: we should have handled the buffered rows_query now that
      // we're handling rows events
      DBUG_ASSERT(!last_rows_query_event.event);
      bool stmt_end= FALSE;
      Table_map_log_event *ignored_map= NULL;
      if (ev_type == WRITE_ROWS_EVENT ||
          ev_type == DELETE_ROWS_EVENT ||
          ev_type == UPDATE_ROWS_EVENT ||
          ev_type == WRITE_ROWS_EVENT_V1 ||
          ev_type == DELETE_ROWS_EVENT_V1 ||
          ev_type == UPDATE_ROWS_EVENT_V1)
      {
        Rows_log_event *new_ev= (Rows_log_event*) ev;
        if (new_ev->get_flags(Rows_log_event::STMT_END_F))
          stmt_end= TRUE;
        ignored_map= print_event_info->m_table_map_ignored.get_table(new_ev->get_table_id());
      }
      else if (ev_type == PRE_GA_WRITE_ROWS_EVENT ||
               ev_type == PRE_GA_DELETE_ROWS_EVENT ||
               ev_type == PRE_GA_UPDATE_ROWS_EVENT)
      {
        Old_rows_log_event *old_ev= (Old_rows_log_event*) ev;
        if (old_ev->get_flags(Rows_log_event::STMT_END_F))
          stmt_end= TRUE;
        ignored_map= print_event_info->m_table_map_ignored.get_table(old_ev->get_table_id());
      }

      bool skip_event= (ignored_map != NULL);
      /*
        end of statement check:
        i) destroy/free ignored maps
        ii) if skip event
              a) set the unflushed_events flag to false
              b) since we are skipping the last event,
                 append END-MARKER(') to body cache (if required)
              c) flush cache now
       */
      if (stmt_end)
      {
        /*
          Now is safe to clear ignored map (clear_tables will also
          delete original table map events stored in the map).
        */
        if (print_event_info->m_table_map_ignored.count() > 0)
          print_event_info->m_table_map_ignored.clear_tables();

        /*
           One needs to take into account an event that gets
           filtered but was last event in the statement. If this is
           the case, previous rows events that were written into
           IO_CACHEs still need to be copied from cache to
           result_file (as it would happen in ev->print(...) if
           event was not skipped).
        */
        if (skip_event)
        {
          // set the unflushed_events flag to false
          print_event_info->have_unflushed_events= FALSE;

          // append END-MARKER(') with delimiter
          IO_CACHE *const body_cache= &print_event_info->body_cache;
          if (my_b_tell(body_cache))
            my_b_printf(body_cache, "'%s\n", print_event_info->delimiter);

          // flush cache
          if ((copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
                                                   result_file, stop_never /* flush result_file */) ||
              copy_event_cache_to_file_and_reinit(&print_event_info->body_cache,
                                                  result_file, stop_never /* flush result_file */)))
            goto err;
        }
      }

      /* skip the event check */
      if (skip_event)
      {
        print_event_info->skipped_event_in_transaction= true;
        goto end;
      }

      /*
        These events must be printed in base64 format, if printed.
        base64 format requires a FD event to be safe, so if no FD
        event has been printed, we give an error.  Except if user
        passed --short-form, because --short-form disables printing
        row events.
      */
      if (!print_event_info->printed_fd_event && !short_form &&
          ev_type != TABLE_MAP_EVENT && ev_type != ROWS_QUERY_LOG_EVENT &&
          opt_base64_output_mode != BASE64_OUTPUT_DECODE_ROWS)
      {
        const char* type_str= ev->get_type_str();
        if (opt_base64_output_mode == BASE64_OUTPUT_NEVER)
          error("--base64-output=never specified, but binlog contains a "
                "%s event which must be printed in base64.",
                type_str);
        else
          error("malformed binlog: it does not contain any "
                "Format_description_log_event. I now found a %s event, which "
                "is not safe to process without a "
                "Format_description_log_event.",
                type_str);
        goto err;
      }

      if (ev_type == TABLE_MAP_EVENT && opt_rewrite_table)
      {
        Table_map_log_event *t_ev = (Table_map_log_event *) ev;

        assert(opt_filter_table &&
               !strcmp(opt_filter_table, t_ev->get_table_name()));

        size_t old_len = strlen(t_ev->get_table_name());
        size_t new_len = strlen(opt_rewrite_table);

        // We need to modify the underlying buffer so the raw event has
        // modified table name. First build a new buffer with the new size.
        size_t new_data_written = ev->data_written - old_len + new_len;
        char *new_buf = (char*) my_malloc(new_data_written,
                                          MYF(MY_WME));

        if (!new_buf)
        {
          error("Got fatal error allocating memory.");
          goto err;
        }

        // The first part of the buffer will remain the same except the length.
        ulong tbl_offset = LOG_EVENT_HEADER_LEN + // Common header length
                           TABLE_MAP_HEADER_LEN + // Table map header length
                           1 + // 1 for db name size
                           strlen(t_ev->get_db_name()) + // length of db name
                           1; // 1 for null termination.

        memcpy(new_buf, ev->temp_buf, tbl_offset);
        int4store(new_buf + EVENT_LEN_OFFSET, new_data_written);
        char *ptr = new_buf + tbl_offset;

        // Set the new length.
        *ptr++ = (char) new_len;
        // Copy new table name.
        memcpy(ptr, opt_rewrite_table, new_len);
        ptr += new_len;
        *ptr++ = 0; // null termination

        // Copy remaining buffer contents.
        ulong offset = tbl_offset +
                       1 + // 1 for table name size
                       old_len + // length of table name
                       1; // 1 for null termination.

        memcpy(ptr, ev->temp_buf + offset, ev->data_written - offset);

        // Change the event's table name. This affects comment output only.
        t_ev->set_table(opt_rewrite_table);

        // Use the new buffer.
        char *buf_old = ev->temp_buf;
        ev->register_temp_buf(new_buf);

        ev->print(result_file, print_event_info);

        // Switch to the old buffer.
        ev->register_temp_buf(buf_old);

        my_free(new_buf);
        new_buf = nullptr;
      }
      else
        ev->print(result_file, print_event_info);

      print_event_info->have_unflushed_events= TRUE;

      /* Flush head and body cache to result_file */
      if (stmt_end)
      {
        print_event_info->have_unflushed_events= FALSE;
        if (copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
                                                result_file, stop_never /* flush result file */) ||
            copy_event_cache_to_file_and_reinit(&print_event_info->body_cache,
                                                result_file, stop_never /* flush result file */))
          goto err;
        goto end;
      }
      break;
    }
    case ANONYMOUS_GTID_LOG_EVENT:
    case GTID_LOG_EVENT:
    {
      seen_gtids= true;
      print_event_info->is_gtid_next_set= true;
      print_event_info->is_gtid_next_valid= true;
      if (print_event_info->skipped_event_in_transaction == true)
        fprintf(result_file, "COMMIT /* added by mysqlbinlog */%s\n", print_event_info->delimiter);
      print_event_info->skipped_event_in_transaction= false;

      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;

      break;
    }
    case XID_EVENT:
    {
      in_transaction= false;
      print_event_info->skipped_event_in_transaction= false;
      if (print_event_info->is_gtid_next_set)
        print_event_info->is_gtid_next_valid= false;
      /*
       * Skip the extra XID event (COMMIT) from other databases when the
       * option --skip-empty-trans is enabled. We also clear the
       * cur_database at the end of currrent transaction
       */
      if (opt_skip_empty_trans)
      {
        bool skip= shall_skip_database(cur_database.c_str());
        // Reset current transaction's database name tracking
        cur_database= "";
        // Reset the flag
        empty_begin_query_ev = false;
        // Delete and reset the cached BEGIN event
        if (begin_query_ev_cache)
        {
          delete begin_query_ev_cache;
          begin_query_ev_cache = nullptr;
        }
        if (skip)
          break;
      }
      ev->print(result_file, print_event_info);

      if (opt_print_gtids && encounter_gtid(cached_gtid))
          goto err;

      if (head->error == -1)
        goto err;
      break;
    }
    case ROTATE_EVENT:
    {
      Rotate_log_event *rev= (Rotate_log_event *) ev;
      /* no transaction context, gtids seen and not a fake rotate */
      if (seen_gtids)
      {
        /*   
          Fake rotate events have 'when' set to zero. @c fake_rotate_event(...).
        */
        bool is_fake= (rev->when.tv_sec == 0);
        /*
          'in_transaction' flag is not set to true even after GTID_LOG_EVENT
          of a transaction is seen. ('mysqlbinlog' tool assumes that there
          is only one event per DDL transaction other than BEGIN and COMMIT
          events. Using 'in_transaction' flag and 'starts_group', 'ends_group'
          flags, DDL transaction generation is handled. Hence 'in_transaction'
          cannot be set to true after seeing GTID_LOG_EVENT). So in order to
          see if we are out of a transaction or not, we should check that
          'in_transaction' is false and we have not seen GTID_LOG_EVENT.
          To see if a GTID_LOG_EVENT of a transaction is seen or not,
          we should check is_gtid_next_valid flag is false.
        */
        if (!is_fake && !in_transaction &&
            print_event_info->is_gtid_next_set &&
            !print_event_info->is_gtid_next_valid)
        {
          /*
            If processing multiple files, we must reset this flag,
            since there may be no gtids on the next one.
          */
          seen_gtids= false;
          fprintf(result_file, "%sAUTOMATIC' /* added by mysqlbinlog */ %s\n",
                  Gtid_log_event::SET_STRING_PREFIX,
                  print_event_info->delimiter);
          print_event_info->is_gtid_next_set= false;
          print_event_info->is_gtid_next_valid= true;
        }
      }
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
      break;
    }
    case PREVIOUS_GTIDS_LOG_EVENT:
      if (one_database && !opt_skip_gtids)
        warning("The option --database has been used. It may filter "
                "parts of transactions, but will include the GTIDs in "
                "any case. If you want to exclude or include transactions, "
                "you should use the options --exclude-gtids or "
                "--include-gtids, respectively, instead.");
    case METADATA_EVENT:
    {
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;

      /* Copy and flush head cache and body cache */
      if (copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
                                              result_file, stop_never) ||
          copy_event_cache_to_file_and_reinit(&print_event_info->body_cache,
                                              result_file, stop_never))
        goto err;

      goto end;
    }

      /* fall through */
    default:
      ev->print(result_file, print_event_info);
      if (head->error == -1)
        goto err;
    }
    /* Flush head cache to result_file for every event */
    if (copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
                                            result_file, stop_never /* flush result_file */))
      goto err;
  }

  goto end;

err:
  retval= ERROR_STOP;
end:
  rec_count++;
  /*
    Destroy the log_event object. If reading from a remote host,
    set the temp_buf to NULL so that memory isn't freed twice.
  */
  if (ev)
  {
    if (opt_remote_proto != BINLOG_LOCAL)
      ev->temp_buf= 0;
    if (destroy_evt) /* destroy it later if not set (ignored table map) */
      delete ev;
  }
  DBUG_RETURN(retval);
}