in migration/src/download_jira.py [0:0]
def download_attachments(num: int, dump_dir: Path, att_data_dir: Path):
dump_file = jira_dump_file(dump_dir, num)
assert dump_file.exists()
attachments_dir = jira_attachments_dir(att_data_dir, num)
if not attachments_dir.exists():
attachments_dir.mkdir()
files: dict[str, Attachment] = {}
with open(dump_file) as fp:
o = json.load(fp)
attachments = o.get("fields").get("attachment")
if not attachments:
return
for a in attachments:
filename = a.get("filename")
created = a.get("created")
content = a.get("content")
mime_type = a.get("mimeType")
if not (filename and created and content and mime_type):
continue
if filename not in files or created > files[filename].created:
files[filename] = Attachment(filename=filename, created=created, content=content, mime_type=mime_type)
for (_, a) in files.items():
logger.debug(f"Downloading attachment {a.filename}")
res = requests.get(a.content, headers={"Accept": a.mime_type})
if res.status_code != 200:
logger.error(f"Failed to download attachment {a.filename} in issue {jira_issue_id(num)}")
continue
attachment_file = attachments_dir.joinpath(a.filename)
with open(attachment_file, "wb") as fp:
fp.write(res.content)
time.sleep(DOWNLOAD_INTERVAL_SEC)