in mysqloperator/sidecar_main.py [0:0]
def meb_do_pitr(datadir:str, session: 'ClassicSession', cluster: InnoDBCluster,
init_spec: MebInitDBSpec, pod: MySQLPod, logger: Logger):
"""Do point-in-time-recovery (PITR)
For doing PITR we have to do those things:
In restore_main donme during init:
1) Extract the backup image so we get access to the binlogs
2) move them in as relay logs
To do here:
3) set GTIDs to skip, if any
4) apply relay logs (binlogs from backup) up to target gtid
5) hold further processing till done
6) cleanup extracted backup file
"""
import time
def is_relay_log_fully_applied(session, logger):
res = session.run_sql("SHOW REPLICA STATUS")
replica_status = res.fetch_one_object()
if not replica_status:
raise Exception("This server is not configured as a MySQL replica.")
io_running = replica_status["Replica_IO_Running"]
sql_running = replica_status["Replica_SQL_Running"]
sql_running_state = replica_status["Replica_SQL_Running_State"]
# states by which we end may vary on sever version and exact reason for
# application of binlogs to end and we are tolerant here. In most cases
# with MySQL 9.0 and later we should have an empty string ("")
end_states = [
"",
"Slave has read all relay log; waiting for more updates",
"Replica has read all relay log; waiting for more updates",
"Replica has stopped"
]
logger.info((f"Applying PITR: IO Running: {io_running}, "
f"SQL Running: {sql_running}, SQL State: {sql_running_state}"))
return io_running == "No" and sql_running == "No" and sql_running_state in end_states
logger.info("Applying Binary Logs")
session.run_sql(f"CHANGE REPLICATION SOURCE TO RELAY_LOG_FILE='{cluster.name}-0-relay-bin-pitr.000001', RELAY_LOG_POS=1, SOURCE_HOST='pitr' FOR CHANNEL 'pitr'")
if (init_spec.pitr_end_term and init_spec.pitr_end_value):
logger.info(f"START REPLICA SQL_THREAD UNTIL {init_spec.pitr_end_term} = ? FOR CHANNEL 'pitr'",
[init_spec.pitr_end_value])
session.run_sql(f"START REPLICA SQL_THREAD UNTIL {init_spec.pitr_end_term} = ? FOR CHANNEL 'pitr'",
[init_spec.pitr_end_value])
else:
session.run_sql("START REPLICA SQL_THREAD FOR CHANNEL 'pitr'")
def growing_sleep():
"""When only short log is to be applied (especially in testcases likely
we don't want to wait long, however if there is a lot to do we don't
want to query too quickly (and in consequence spam the log)
The growth factor here is absolutely hand-wavy and not based on any
science
"""
s = 0
while s < 5:
s += 0.2
yield time.sleep(s)
while s < 15:
s += 1
yield time.sleep(s)
while True:
yield time.sleep(15)
for _ in growing_sleep():
if is_relay_log_fully_applied(session, logger):
break
session.run_sql("STOP REPLICA SQL_THREAD FOR CHANNEL 'pitr'").fetch_all()
session.run_sql("RESET REPLICA ALL").fetch_all()