def process_job_dir()

in rd-agent/src/misc/sideloader.py [0:0]


def process_job_dir(jobfiles, jobs, now):
    global args

    job_dir_path = pathlib.Path(args.jobdir)
    input_jobfiles = {}

    # Open all job files.
    for path in job_dir_path.glob("*"):
        try:
            if path.is_symlink() or not path.is_file():
                raise Exception("Invalid file type")
            fh = path.open("r", encoding="utf-8")
            ino = os.fstat(fh.fileno()).st_ino
            input_jobfiles[ino] = JobFile(ino, str(path), fh)
        except Exception as e:
            warn(f"Failed to open {path} ({e})")

    # Let's find out which files are gone and which are new.
    gone_jobfiles = []
    new_jobfiles = []

    for _ino, jf in jobfiles.items():
        if jf.ino not in input_jobfiles:
            gone_jobfiles.append(jf)

    for _ino, jf in input_jobfiles.items():
        if jf.ino not in jobfiles:
            new_jobfiles.append(jf)

    if len(gone_jobfiles):
        ddbg(f"gone_jobfiles: {[jf.path for jf in gone_jobfiles]}")
    if len(new_jobfiles):
        ddbg(f"new_jobfiles: {[jf.path for jf in new_jobfiles]}")

    for jf in gone_jobfiles:
        del jobfiles[jf.ino]

    # Collect active jobids and determine jobs to kill.
    jobids = set()
    jobs_to_kill = {}

    for _jobid, job in jobs.items():
        if job.jobfile.ino in jobfiles:
            jobids.add(job.jobid)
        else:
            jobs_to_kill[job.jobid] = job

    if len(jobs_to_kill):
        ddbg(f"jobs_to_kill: {list(jobs_to_kill)}")

    # Load new job files
    jobs_to_start = {}

    for jf in new_jobfiles:
        jf_jobids = set()
        jf_jobs = {}
        try:
            parsed = json.load(jf.fh)
            for ent in parsed["sideloader_jobs"]:
                job = Job(ent, jf)
                if job.jobid in jobids or job.jobid in jf_jobids:
                    raise Exception(f"Duplicate job id {job.jobid}")
                jf_jobids.add(job.jobid)
                jf_jobs[job.jobid] = job
        except Exception as e:
            warn(f"Failed to load {path} ({e})")
        else:
            jobfiles[jf.ino] = jf
            jobids = jobids.union(jf_jobids)
            jobs_to_start.update(jf_jobs)

    if len(jobs_to_start):
        ddbg(f"jobs_to_start: {jobs_to_start}")

    return jobs_to_kill, jobs_to_start