in azure/datalake/store/transfer.py [0:0]
def submit(self, src, dst, length):
"""
Split a given file into chunks.
All submitted files/chunks start in the `pending` state until `run()`
is called.
"""
cstates = StateManager(
'pending', 'running', 'finished', 'cancelled', 'errored')
# Create unique temporary directory for each file
if self._chunked:
if self._unique_temporary:
filename = "{}.segments.{}".format(dst.name, self._unique_str)
else:
filename = "{}.segments".format(dst.name)
tmpdir = dst.parent/filename
else:
tmpdir = None
# TODO: might need xrange support for py2
offsets = range(0, length, self._chunksize)
# in the case of empty files, ensure that the initial offset of 0 is properly added.
if not offsets:
if not length:
offsets = [0]
else:
raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length))
tmpdir_and_offsets = tmpdir and len(offsets) > 1
for offset in offsets:
if tmpdir_and_offsets:
name = tmpdir / "{}_{}".format(dst.name, offset)
else:
name = dst
cstates[(name, offset)] = 'pending'
self._chunks[(name, offset)] = {
"parent": (src, dst),
"expected": min(length - offset, self._chunksize),
"actual": 0,
"exception": None}
logger.debug("Submitted %s, byte offset %d", name, offset)
self._fstates[(src, dst)] = 'pending'
self._files[(src, dst)] = {
"length": length,
"cstates": cstates,
"exception": None}
self._transfer_total_bytes += length