in azure/datalake/store/core.py [0:0]
def flush(self, syncFlag='METADATA', force=False):
"""
Write buffered data to ADL.
Without delimiter: Uploads the current buffer.
With delimiter: writes an amount of data less than or equal to the
block-size, which ends on the delimiter, until buffer is smaller than
the blocksize. If there is no delimiter in a block uploads whole block.
If force=True, flushes all data in the buffer, even if it doesn't end
with a delimiter; appropriate when closing the file.
"""
if not self.writable() or self.closed:
return
if not (syncFlag == 'METADATA' or syncFlag == 'DATA' or syncFlag == 'CLOSE'):
raise ValueError('syncFlag must be one of these: METADATA, DATA or CLOSE')
common_args_append = {
'rest': self.azure.azure,
'op': 'APPEND',
'path': self.path.as_posix(),
'append': 'true',
'leaseid': self.leaseid,
'filesessionid': self.filesessionid
}
self.buffer.seek(0) # Go to start of buffer
data = self.buffer.read()
while len(data) > self.blocksize:
data_to_write_limit = self.blocksize
if self.delimiter:
delimiter_index = data.rfind(self.delimiter, 0, self.blocksize)
if delimiter_index != -1: # delimiter found
data_to_write_limit = delimiter_index + len(self.delimiter)
offset = self.tell() - len(data)
_put_data_with_retry(syncFlag='DATA', data=data[:data_to_write_limit], offset=offset, **common_args_append)
logger.debug('Wrote %d bytes to %s' % (data_to_write_limit, self))
data = data[data_to_write_limit:]
if force:
offset = self.tell() - len(data)
_put_data_with_retry(syncFlag=syncFlag, data=data, offset=offset, **common_args_append)
logger.debug('Wrote %d bytes to %s' % (len(data), self))
data = b''
self.buffer = io.BytesIO(data)
self.buffer.seek(0, 2) # seek to end for other writes to buffer