in treeherder/etl/pushlog.py [0:0]
def run(self, source_url, repository_name, changeset=None, last_push_id=None):
cache_key = f"{repository_name}:last_push_id"
if not last_push_id:
# get the last object seen from cache. this will
# reduce the number of pushes processed every time
last_push_id = cache.get(cache_key)
if not changeset and last_push_id:
startid_url = f"{source_url}&startID={last_push_id}"
logger.debug(
"Extracted last push for '%s', '%s', from cache, "
"attempting to get changes only from that point at: %s",
repository_name,
last_push_id,
startid_url,
)
# Use the cached ``last_push_id`` value (saved from the last time
# this API was called) for this repo. Use that value as the
# ``startID`` to get all new pushes from that point forward.
extracted_content = self.extract(startid_url)
if extracted_content["lastpushid"] < last_push_id:
# Push IDs from Mercurial are incremental. If we cached a value
# from one call to this API, and a subsequent call told us that
# the ``lastpushid`` is LOWER than the one we have cached, then
# the Mercurial IDs were reset.
# In this circumstance, we can't rely on the cached id, so must
# throw it out and get the latest 10 pushes.
logger.warning(
"Got a ``lastpushid`` value of %s lower than the cached value of %s "
"due to Mercurial repo reset. Getting latest changes for '%s' instead",
extracted_content["lastpushid"],
last_push_id,
repository_name,
)
cache.delete(cache_key)
extracted_content = self.extract(source_url)
else:
if changeset:
logger.info(
f"Getting all pushes for '{repository_name}' corresponding to changeset '{changeset}'"
)
extracted_content = self.extract(f"{source_url}&changeset={changeset}")
else:
logger.warning(
f"Unable to get last push from cache for '{repository_name}', getting all pushes",
)
extracted_content = self.extract(source_url)
pushes = extracted_content["pushes"]
# `pushes` could be empty if there are no new ones since we last fetched
if not pushes:
return None
last_push_id = max(map(int, pushes.keys()))
last_push = pushes[str(last_push_id)]
top_revision = last_push["changesets"][-1]["node"]
errors = []
repository = Repository.objects.get(name=repository_name)
for push in pushes.values():
if not push["changesets"]:
# A push without commits means it was marked as obsolete (see bug 1286426).
# Without them it's not possible to calculate the push revision required for ingestion.
continue
try:
store_push(repository, self.transform_push(push))
except Exception:
newrelic.agent.notice_error()
errors.append(
{
"project": repository,
"collection": "result_set",
"message": traceback.format_exc(),
}
)
if errors:
raise CollectionNotStoredError(errors)
if not changeset:
# only cache the last push if we're not fetching a specific changeset
cache.set(cache_key, last_push_id, ONE_WEEK_IN_SECONDS)
return top_revision