in s3transfer/utils.py [0:0]
def release(self, tag, acquire_token):
sequence_number = acquire_token
logger.debug("Releasing acquire %s/%s", tag, sequence_number)
self._condition.acquire()
try:
if tag not in self._tag_sequences:
raise ValueError("Attempted to release unknown tag: %s" % tag)
max_sequence = self._tag_sequences[tag]
if self._lowest_sequence[tag] == sequence_number:
# We can immediately process this request and free up
# resources.
self._lowest_sequence[tag] += 1
self._count += 1
self._condition.notify()
queued = self._pending_release.get(tag, [])
while queued:
if self._lowest_sequence[tag] == queued[-1]:
queued.pop()
self._lowest_sequence[tag] += 1
self._count += 1
else:
break
elif self._lowest_sequence[tag] < sequence_number < max_sequence:
# We can't do anything right now because we're still waiting
# for the min sequence for the tag to be released. We have
# to queue this for pending release.
self._pending_release.setdefault(tag, []).append(
sequence_number
)
self._pending_release[tag].sort(reverse=True)
else:
raise ValueError(
"Attempted to release unknown sequence number "
"%s for tag: %s" % (sequence_number, tag)
)
finally:
self._condition.release()