in awswrangler/s3/_fs.py [0:0]
def flush(self, force: bool = False) -> None:
"""Write buffered data to S3."""
if self.closed:
raise RuntimeError("I/O operation on closed file.")
if self.writable() and self._buffer.closed is False:
total_size: int = self._buffer.tell()
if total_size < _MIN_WRITE_BLOCK and force is False:
return None
if total_size == 0:
return None
try:
_logger.debug("Flushing: %s bytes", total_size)
self._mpu = self._mpu or _utils.try_it(
f=self._client.create_multipart_upload, # type: ignore[arg-type]
ex=_S3_RETRYABLE_ERRORS,
base=0.5,
max_num_tries=6,
Bucket=self._bucket,
Key=self._key,
**get_botocore_valid_kwargs(
function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs
),
)
self._buffer.seek(0)
for chunk_size in _utils.get_even_chunks_sizes(
total_size=total_size, chunk_size=_MIN_WRITE_BLOCK, upper_bound=False
):
_logger.debug("chunk_size: %s bytes", chunk_size)
self._parts_count += 1
self._upload_proxy.upload(
bucket=self._bucket,
key=self._key,
part=self._parts_count,
upload_id=self._mpu["UploadId"],
data=self._buffer.read(chunk_size),
s3_client=self._client,
boto3_kwargs=get_botocore_valid_kwargs(
function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs
),
)
finally:
# Ensure that the buffer is cleared (even in the event of an exception) so that
# any partial data doesn't get written when close() is called.
self._buffer.seek(0)
self._buffer.truncate(0)
self._buffer.close()
self._buffer = io.BytesIO()
return None