in jetstream/artifacts.py [0:0]
def _image_for_date(self, date: datetime) -> str:
"""Return the hash for the image that was the latest on the provided date."""
latest_updated = None
earliest_uploaded = None
# filter for the most recent jetstream image
image: artifactregistry.DockerImage
for image in self.images:
# A note on the type ignore comments:
# - mypy and the DockerImage docs both indicate that `upload_time`
# should be Timestamp type, but when we run the tests, they appear
# to be DatetimeWithNanoseconds instead. This code comparing
# `upload_time` with datetime objects has been working, so
# we ignore mypy here due to the conflicting errors.
if not image:
continue
upload_time = image.upload_time
# img: artifactregistry.DockerImage = image
if (latest_updated is None and upload_time <= date) or ( # type: ignore
latest_updated
and latest_updated.upload_time < upload_time # type: ignore
and upload_time <= date # type: ignore
):
latest_updated = image
# keep track of the earliest image available
if (
earliest_uploaded is None or upload_time <= earliest_uploaded.upload_time # type: ignore
):
earliest_uploaded = image
if latest_updated:
# return hash of image closest to the provided date
return latest_updated.name.split("sha256:")[1]
elif earliest_uploaded:
# return hash of earliest image available if table got created before image got uploaded
return earliest_uploaded.name.split("sha256:")[1]
else:
raise ValueError(f"No `{self.image}` docker image available in {self.project}")