in rd-agent/src/misc/sideloader.py [0:0]
def process_job_dir(jobfiles, jobs, now):
global args
job_dir_path = pathlib.Path(args.jobdir)
input_jobfiles = {}
# Open all job files.
for path in job_dir_path.glob("*"):
try:
if path.is_symlink() or not path.is_file():
raise Exception("Invalid file type")
fh = path.open("r", encoding="utf-8")
ino = os.fstat(fh.fileno()).st_ino
input_jobfiles[ino] = JobFile(ino, str(path), fh)
except Exception as e:
warn(f"Failed to open {path} ({e})")
# Let's find out which files are gone and which are new.
gone_jobfiles = []
new_jobfiles = []
for _ino, jf in jobfiles.items():
if jf.ino not in input_jobfiles:
gone_jobfiles.append(jf)
for _ino, jf in input_jobfiles.items():
if jf.ino not in jobfiles:
new_jobfiles.append(jf)
if len(gone_jobfiles):
ddbg(f"gone_jobfiles: {[jf.path for jf in gone_jobfiles]}")
if len(new_jobfiles):
ddbg(f"new_jobfiles: {[jf.path for jf in new_jobfiles]}")
for jf in gone_jobfiles:
del jobfiles[jf.ino]
# Collect active jobids and determine jobs to kill.
jobids = set()
jobs_to_kill = {}
for _jobid, job in jobs.items():
if job.jobfile.ino in jobfiles:
jobids.add(job.jobid)
else:
jobs_to_kill[job.jobid] = job
if len(jobs_to_kill):
ddbg(f"jobs_to_kill: {list(jobs_to_kill)}")
# Load new job files
jobs_to_start = {}
for jf in new_jobfiles:
jf_jobids = set()
jf_jobs = {}
try:
parsed = json.load(jf.fh)
for ent in parsed["sideloader_jobs"]:
job = Job(ent, jf)
if job.jobid in jobids or job.jobid in jf_jobids:
raise Exception(f"Duplicate job id {job.jobid}")
jf_jobids.add(job.jobid)
jf_jobs[job.jobid] = job
except Exception as e:
warn(f"Failed to load {path} ({e})")
else:
jobfiles[jf.ino] = jf
jobids = jobids.union(jf_jobids)
jobs_to_start.update(jf_jobs)
if len(jobs_to_start):
ddbg(f"jobs_to_start: {jobs_to_start}")
return jobs_to_kill, jobs_to_start