in redis/client.py [0:0]
def _execute_transaction(self, connection, commands, raise_on_error) -> List:
cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
all_cmds = connection.pack_commands(
[args for args, options in cmds if EMPTY_RESPONSE not in options]
)
connection.send_packed_command(all_cmds)
errors = []
# parse off the response for MULTI
# NOTE: we need to handle ResponseErrors here and continue
# so that we read all the additional command messages from
# the socket
try:
self.parse_response(connection, "_")
except ResponseError as e:
errors.append((0, e))
# and all the other commands
for i, command in enumerate(commands):
if EMPTY_RESPONSE in command[1]:
errors.append((i, command[1][EMPTY_RESPONSE]))
else:
try:
self.parse_response(connection, "_")
except ResponseError as e:
self.annotate_exception(e, i + 1, command[0])
errors.append((i, e))
# parse the EXEC.
try:
response = self.parse_response(connection, "_")
except ExecAbortError:
if errors:
raise errors[0][1]
raise
# EXEC clears any watched keys
self.watching = False
if response is None:
raise WatchError("Watched variable changed.")
# put any parse errors into the response
for i, e in errors:
response.insert(i, e)
if len(response) != len(commands):
self.connection.disconnect()
raise ResponseError(
"Wrong number of response items from pipeline execution"
)
# find any errors in the response and raise if necessary
if raise_on_error:
self.raise_first_error(commands, response)
# We have to run response callbacks manually
data = []
for r, cmd in zip(response, commands):
if not isinstance(r, Exception):
args, options = cmd
# Remove keys entry, it needs only for cache.
options.pop("keys", None)
command_name = args[0]
if command_name in self.response_callbacks:
r = self.response_callbacks[command_name](r, **options)
data.append(r)
return data