in redis/connection.py [0:0]
def send_command(self, *args, **kwargs):
self._process_pending_invalidations()
with self._cache_lock:
# Command is write command or not allowed
# to be cached.
if not self._cache.is_cachable(CacheKey(command=args[0], redis_keys=())):
self._current_command_cache_key = None
self._conn.send_command(*args, **kwargs)
return
if kwargs.get("keys") is None:
raise ValueError("Cannot create cache key.")
# Creates cache key.
self._current_command_cache_key = CacheKey(
command=args[0], redis_keys=tuple(kwargs.get("keys"))
)
with self._cache_lock:
# We have to trigger invalidation processing in case if
# it was cached by another connection to avoid
# queueing invalidations in stale connections.
if self._cache.get(self._current_command_cache_key):
entry = self._cache.get(self._current_command_cache_key)
if entry.connection_ref != self._conn:
with self._pool_lock:
while entry.connection_ref.can_read():
entry.connection_ref.read_response(push_request=True)
return
# Set temporary entry value to prevent
# race condition from another connection.
self._cache.set(
CacheEntry(
cache_key=self._current_command_cache_key,
cache_value=self.DUMMY_CACHE_VALUE,
status=CacheEntryStatus.IN_PROGRESS,
connection_ref=self._conn,
)
)
# Send command over socket only if it's allowed
# read-only command that not yet cached.
self._conn.send_command(*args, **kwargs)