in awswrangler/s3/_fs.py [0:0]
def close(self) -> None:
"""Clean up the cache."""
if self.closed:
return None
if self.writable():
_logger.debug("Closing: %s parts", self._parts_count)
if self._parts_count > 0:
self.flush(force=True)
parts: list[dict[str, str | int]] = self._upload_proxy.close()
part_info: dict[str, list[dict[str, Any]]] = {"Parts": parts}
_logger.debug("Running complete_multipart_upload...")
_utils.try_it(
f=self._client.complete_multipart_upload,
ex=_S3_RETRYABLE_ERRORS,
base=0.5,
max_num_tries=6,
Bucket=self._bucket,
Key=self._key,
UploadId=self._mpu["UploadId"],
MultipartUpload=part_info,
**get_botocore_valid_kwargs(
function_name="complete_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs
),
)
_logger.debug("complete_multipart_upload done!")
elif self._buffer.tell() > 0:
_logger.debug("put_object")
_utils.try_it(
f=self._client.put_object,
ex=_S3_RETRYABLE_ERRORS,
base=0.5,
max_num_tries=6,
Bucket=self._bucket,
Key=self._key,
Body=self._buffer.getvalue(),
**get_botocore_valid_kwargs(
function_name="put_object", s3_additional_kwargs=self._s3_additional_kwargs
),
)
self._parts_count = 0
self._upload_proxy.close()
self._buffer.seek(0)
self._buffer.truncate(0)
self._buffer.close()
elif self.readable():
self._cache = b""
else:
raise RuntimeError(f"Invalid mode: {self._mode}")
super().close()
return None