in redshift_connector/core.py [0:0]
def execute(self: "Connection", cursor: Cursor, operation: str, vals) -> None:
"""
Executes a database operation. Parameters may be provided as a sequence, or as a mapping, depending upon the value of `redshift_connector.paramstyle`.
Parameters
----------
cursor : :class:`Cursor`
operation : str The SQL statement to execute.
vals : If `redshift_connector.paramstyle` is `qmark`, `numeric`, or `format` this argument should be an array of parameters to bind into the statement. If `redshift_connector.paramstyle` is `named` the argument should be a `dict` mapping of parameters. If `redshift_connector.paramstyle` is `pyformat`, the argument value may be either an array or mapping.
Returns
-------
None:None
"""
if vals is None:
vals = ()
# get the process ID of the calling process.
pid: int = getpid()
# multi dimensional dictionary to store the data
# cache = self._caches[cursor.paramstyle][pid]
# cache = {'statement': {}, 'ps': {}}
# statement store the data of statement, ps store the data of prepared statement
# statement = {operation(query): tuple from 'conver_paramstyle'(statement, make_args)}
try:
cache = self._caches[cursor.paramstyle][pid]
except KeyError:
try:
param_cache = self._caches[cursor.paramstyle]
except KeyError:
param_cache = self._caches[cursor.paramstyle] = {}
try:
cache = param_cache[pid]
except KeyError:
cache = param_cache[pid] = {"statement": {}, "ps": {}}
try:
statement, make_args = cache["statement"][operation]
except KeyError:
statement, make_args = cache["statement"][operation] = convert_paramstyle(cursor.paramstyle, operation)
args = make_args(vals)
# change the args to the format that the DB will identify
# take reference from self.py_types
params = self.make_params(args)
key = operation, params
try:
ps = cache["ps"][key]
cursor.ps = ps
except KeyError:
statement_nums: typing.List[int] = [0]
for style_cache in self._caches.values():
try:
pid_cache = style_cache[pid]
for csh in pid_cache["ps"].values():
statement_nums.append(csh["statement_num"])
except KeyError:
pass
# statement_num is the id of statement increasing from 1
statement_num: int = sorted(statement_nums)[-1] + 1
# consist of "redshift_connector", statement, process id and statement number.
# e.g redshift_connector_statement_11432_2
statement_name: str = "_".join(("redshift_connector", "statement", str(pid), str(statement_num)))
statement_name_bin: bytes = statement_name.encode("ascii") + NULL_BYTE
# row_desc: list that used to store metadata of rows from DB
# param_funcs: type transform function
ps = {
"statement_name_bin": statement_name_bin,
"pid": pid,
"statement_num": statement_num,
"row_desc": [],
"param_funcs": tuple(x[2] for x in params),
}
cursor.ps = ps
param_fcs = tuple(x[1] for x in params)
# Byte1('P') - Identifies the message as a Parse command.
# Int32 - Message length, including self.
# String - Prepared statement name. An empty string selects the
# unnamed prepared statement.
# String - The query string.
# Int16 - Number of parameter data types specified (can be zero).
# For each parameter:
# Int32 - The OID of the parameter data type.
val: typing.Union[bytes, bytearray] = bytearray(statement_name_bin)
typing.cast(bytearray, val).extend(statement.encode(_client_encoding) + NULL_BYTE)
typing.cast(bytearray, val).extend(h_pack(len(params)))
for oid, fc, send_func in params:
# Parse message doesn't seem to handle the -1 type_oid for NULL
# values that other messages handle. So we'll provide type_oid
# 705, the PG "unknown" type.
typing.cast(bytearray, val).extend(i_pack(705 if oid == -1 else oid))
# Byte1('D') - Identifies the message as a describe command.
# Int32 - Message length, including self.
# Byte1 - 'S' for prepared statement, 'P' for portal.
# String - The name of the item to describe.
# PARSE message will notify database to create a prepared statement object
self._send_message(PARSE, val)
# DESCRIBE message will specify the name of the existing prepared statement
# the response will be a parameterDescribing message describe the parameters needed
# and a RowDescription message describe the rows will be return(nodata message when no return rows)
self._send_message(DESCRIBE, STATEMENT + statement_name_bin)
# at completion of query message, driver issue a sync message
self._write(SYNC_MSG)
try:
self._flush()
except AttributeError as e:
if self._sock is None:
raise InterfaceError("connection is closed")
else:
raise e
self.handle_messages(cursor)
# We've got row_desc that allows us to identify what we're
# going to get back from this statement.
output_fc = tuple(self.pg_types[f["type_oid"]][0] for f in ps["row_desc"])
ps["input_funcs"] = tuple(f["func"] for f in ps["row_desc"])
# Byte1('B') - Identifies the Bind command.
# Int32 - Message length, including self.
# String - Name of the destination portal.
# String - Name of the source prepared statement.
# Int16 - Number of parameter format codes.
# For each parameter format code:
# Int16 - The parameter format code.
# Int16 - Number of parameter values.
# For each parameter value:
# Int32 - The length of the parameter value, in bytes, not
# including this length. -1 indicates a NULL parameter
# value, in which no value bytes follow.
# Byte[n] - Value of the parameter.
# Int16 - The number of result-column format codes.
# For each result-column format code:
# Int16 - The format code.
ps["bind_1"] = (
NULL_BYTE
+ statement_name_bin
+ h_pack(len(params))
+ pack("!" + "h" * len(param_fcs), *param_fcs)
+ h_pack(len(params))
)
ps["bind_2"] = h_pack(len(output_fc)) + pack("!" + "h" * len(output_fc), *output_fc)
if len(cache["ps"]) > self.max_prepared_statements:
for p in cache["ps"].values():
self.close_prepared_statement(p["statement_name_bin"])
cache["ps"].clear()
cache["ps"][key] = ps
cursor._cached_rows.clear()
cursor._row_count = -1
# Byte1('B') - Identifies the Bind command.
# Int32 - Message length, including self.
# String - Name of the destination portal.
# String - Name of the source prepared statement.
# Int16 - Number of parameter format codes.
# For each parameter format code:
# Int16 - The parameter format code.
# Int16 - Number of parameter values.
# For each parameter value:
# Int32 - The length of the parameter value, in bytes, not
# including this length. -1 indicates a NULL parameter
# value, in which no value bytes follow.
# Byte[n] - Value of the parameter.
# Int16 - The number of result-column format codes.
# For each result-column format code:
# Int16 - The format code.
retval: bytearray = bytearray(ps["bind_1"])
for value, send_func in zip(args, ps["param_funcs"]):
if value is None:
val = NULL
else:
val = send_func(value)
retval.extend(i_pack(len(val)))
retval.extend(val)
retval.extend(ps["bind_2"])
# send BIND message which includes name of parepared statement,
# name of destination portal and the value of placeholders in prepared statement.
# these parameters need to match the prepared statements
self._send_message(BIND, retval)
self.send_EXECUTE(cursor)
self._write(SYNC_MSG)
self._flush()
# handle multi messages including BIND_COMPLETE, DATA_ROW, COMMAND_COMPLETE
# READY_FOR_QUERY
if self.merge_socket_read:
self.handle_messages_merge_socket_read(cursor)
else:
self.handle_messages(cursor)