in azure/datalake/store/lib.py [0:0]
def call(self, op, path='', is_extended=False, expected_error_code=None, retry_policy=None, headers = {}, **kwargs):
""" Execute a REST call
Parameters
----------
op: str
webHDFS operation to perform, one of `DatalakeRESTInterface.ends`
path: str
filepath on the remote system
is_extended: bool (False)
Indicates if the API call comes from the webhdfs extensions path or the basic webhdfs path.
By default, all requests target the official webhdfs path. A small subset of custom convenience
methods specific to Azure Data Lake Store target the extension path (such as SETEXPIRY).
expected_error_code: int
Optionally indicates a specific, expected error code, if any. In the event that this error
is returned, the exception will be logged to DEBUG instead of ERROR stream. The exception
will still be raised, however, as it is expected that the caller will expect to handle it
and do something different if it is raised.
kwargs: dict
other parameters, as defined by the webHDFS standard and
https://msdn.microsoft.com/en-us/library/mt710547.aspx
"""
retry_policy = ExponentialRetryPolicy() if retry_policy is None else retry_policy
if op not in self.ends:
raise ValueError("No such op: %s", op)
method, required, allowed = self.ends[op]
allowed.add('api-version')
data = kwargs.pop('data', b'')
stream = kwargs.pop('stream', False)
keys = set(kwargs)
if required > keys:
raise ValueError("Required parameters missing: %s",
required - keys)
if keys - allowed > set():
raise ValueError("Extra parameters given: %s",
keys - allowed)
params = {'OP': op}
if self.api_version:
params['api-version'] = self.api_version
params.update(kwargs)
if is_extended:
url = self.url + self.extended_operations
else:
url = self.url + self.webhdfs
url += urllib.quote(path)
retry_count = -1
request_id = str(uuid.uuid1())
while True:
retry_count += 1
last_exception = None
try:
response = self.__call_once(method=method,
url=url,
params=params,
data=data,
stream=stream,
request_id=request_id,
retry_count=retry_count,
op=op,
path=path,
headers=headers,
**kwargs)
# Trigger download here so any errors can be retried. response.content is cached for future use.
temp_download = response.content
except requests.exceptions.RequestException as e:
last_exception = e
response = None
request_successful = self.is_successful_response(response, last_exception)
if request_successful or not retry_policy.should_retry(response, last_exception, retry_count):
break
if not request_successful and last_exception is not None:
raise DatalakeRESTException('HTTP error: ' + repr(last_exception))
exception_log_level = logging.ERROR
if expected_error_code and response.status_code == expected_error_code:
logger.log(logging.DEBUG, 'Error code: {} was an expected potential error from the caller. Logging the exception to the debug stream'.format(response.status_code))
exception_log_level = logging.DEBUG
if response.status_code == 403:
self.log_response_and_raise(response, PermissionError(path), level=exception_log_level)
elif response.status_code == 404:
self.log_response_and_raise(response, FileNotFoundError(path), level=exception_log_level)
elif response.status_code >= 400:
err = DatalakeRESTException(
'Data-lake REST exception: %s, %s' % (op, path))
if self._is_json_response(response):
out = response.json()
if 'RemoteException' in out:
exception = out['RemoteException']['exception']
if exception == 'BadOffsetException':
err = DatalakeBadOffsetException(path)
self.log_response_and_raise(response, err, level=logging.DEBUG)
self.log_response_and_raise(response, err, level=exception_log_level)
else:
self._log_response(response)
if self._is_json_response(response):
out = response.json()
if out.get('boolean', True) is False:
err = DatalakeRESTException(
'Operation failed: %s, %s' % (op, path))
self.log_response_and_raise(response, err)
return out
return response