in src/rpdk/core/contract/resource_client.py [0:0]
def call(self, action, current_model, previous_model=None, **kwargs):
request = self._make_payload(
action,
current_model,
previous_model,
TypeConfiguration.get_type_configuration(),
**kwargs,
)
start_time = time.time()
response = self._call(request)
self.assert_time(start_time, time.time(), action)
# this throws a KeyError if status isn't present, or if it isn't a valid status
status = OperationStatus[response["status"]]
if action in (Action.READ, Action.LIST):
assert status != OperationStatus.IN_PROGRESS
return status, response
while status == OperationStatus.IN_PROGRESS:
callback_delay_seconds = self.assert_in_progress(status, response)
self.assert_primary_identifier(
self.primary_identifier_paths, response.get("resourceModel")
)
sleep(callback_delay_seconds)
request["requestData"]["resourceProperties"] = response.get("resourceModel")
request["callbackContext"] = response.get("callbackContext")
# refresh credential for every handler invocation
request["requestData"]["callerCredentials"] = get_temporary_credentials(
self._session, LOWER_CAMEL_CRED_KEYS, self._role_arn
)
response = self._call(request)
status = OperationStatus[response["status"]]
# ensure writeOnlyProperties are not returned on final responses
if "resourceModel" in response.keys() and status == OperationStatus.SUCCESS:
self.assert_write_only_property_does_not_exist(response["resourceModel"])
return status, response