in cronjobs/src/main.py [0:0]
def run(command, event=None, context=None):
if event is None:
event = {"server": SERVER_URL}
if context is None:
context = {"sentry_sdk": sentry_sdk}
if isinstance(command, (str,)):
# Import the command module and returns its main function.
mod = importlib.import_module(f"commands.{command}")
command = getattr(mod, command)
# Note! If the sentry_sdk was initialized with the platform integration,
# it is now ready to automatically capture all and any unexpected exceptions.
# See https://docs.sentry.io/platforms/python/guides/aws-lambda/
# See https://docs.sentry.io/platforms/python/guides/gcp-functions/
# Option to test failure to test Sentry integration.
if event.get("force_fail") or os.getenv("FORCE_FAIL"):
raise Exception("Found forced failure flag")
return command(event, context)