actions/devel/redis/command/redis.py (31 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import common.util as ut
import redis
from common.command_data import CommandData
class Redis():
"""
Implementation of a Redis Command executor. It will require
a user_data dictionary linked to a specific user.
"""
def __init__(self, user_data):
self._user_data = user_data
self._redis_prefix = ut.get_env_value(user_data,"REDIS_PREFIX")
self._redis_url = ut.get_env_value(user_data,"REDIS_URL")
self.validate()
def validate(self):
"""
Validate that the provided user_data contains the appropriate
metadata for being able to submit a redis command.
"""
if not self._redis_prefix or not self._redis_url:
raise Exception("user does not have valid REDIS environment set")
def execute(self, input:CommandData):
print(f"**** Redis command to execute {input.command()}")
try:
r = redis.from_url(self._redis_url)
result = r.execute_command(input.command())
if result:
input.status(200)
# Ensure result is always a string
if isinstance(result, bytes):
result = result.decode("utf-8")
else:
result = str(result)
input.result(result)
else:
input.status(204)
input.result(None)
except Exception as e:
input.result(f"could not execute redis command {e}")
input.status(400)
return input