manage.py (96 lines of code) (raw):
import os
import warnings
import base64
import csv
import datetime
from flask_migrate import MigrateCommand
from flask_script import Manager
from dotenv import load_dotenv
from backend import create_app, initialise_counters
from backend.services.users.authentication_service import AuthenticationService
from backend.services.users.user_service import UserService
from backend.services.stats_service import StatsService
from backend.services.interests_service import InterestService
from backend.models.postgis.utils import NotFound
from backend.models.postgis.task import Task, TaskHistory
from sqlalchemy import func
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
# Load configuration from file into environment
load_dotenv(os.path.join(os.path.dirname(__file__), "tasking-manager.env"))
# Check that required environmental variables are set
for key in [
"TM_APP_BASE_URL",
"POSTGRES_DB",
"POSTGRES_USER",
"POSTGRES_PASSWORD",
"TM_SECRET",
"TM_CONSUMER_KEY",
"TM_CONSUMER_SECRET",
"TM_DEFAULT_CHANGESET_COMMENT",
]:
if not os.getenv(key):
warnings.warn("%s environmental variable not set." % (key,))
# Initialise the flask app object
application = create_app()
# Initialize homepage counters
try:
initialise_counters(application)
except Exception:
warnings.warn("Homepage counters not initialized.")
# Add management commands
manager = Manager(application)
# Enable db migrations to be run via the command line
manager.add_command("db", MigrateCommand)
# Job runs once every 2 hours
@manager.command
def auto_unlock_tasks():
with application.app_context():
# Identify distinct project IDs that were touched in the last 2 hours
query = (
TaskHistory.query.with_entities(TaskHistory.project_id)
.filter(
func.DATE(TaskHistory.action_date)
> datetime.datetime.utcnow() - datetime.timedelta(minutes=130)
)
.distinct()
)
projects = query.all()
# For each project update task history for tasks that were not manually unlocked
for project in projects:
project_id = project[0]
Task.auto_unlock_tasks(project_id)
# Setup a background cron job
cron = BackgroundScheduler(daemon=True)
# Initiate the background thread
cron.add_job(auto_unlock_tasks, "interval", hours=2)
cron.start()
application.logger.debug("Initiated background thread to auto unlock tasks")
# Shutdown your cron thread when the application is stopped
atexit.register(lambda: cron.shutdown(wait=False))
@manager.option("-u", "--user_id", help="Test User ID")
def gen_token(user_id):
""" Helper method for generating valid base64 encoded session tokens """
token = AuthenticationService.generate_session_token_for_user(user_id)
print(f"Raw token is: {token}")
b64_token = base64.b64encode(token.encode())
print(f"Your base64 encoded session token: {b64_token}")
@manager.command
def refresh_levels():
print("Started updating mapper levels...")
users_updated = UserService.refresh_mapper_level()
print(f"Updated {users_updated} user mapper levels")
@manager.command
def refresh_project_stats():
print("Started updating project stats...")
StatsService.update_all_project_stats()
print("Project stats updated")
@manager.command
def update_project_categories(filename):
with open(filename, "r", encoding="ISO-8859-1", newline="") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
project_id = int(row.get("projectId"))
primary_category = row.get("primaryCat")
interest_ids = []
# Map only primary_category interest to projects
if primary_category:
try:
interest = InterestService.get_by_name(primary_category)
except NotFound:
interest = InterestService.create(primary_category)
interest_ids.append(interest.id)
try:
InterestService.create_or_update_project_interests(
project_id, interest_ids
)
except Exception as e:
print(f"Problem updating {project_id}: {type(e)}")
if __name__ == "__main__":
manager.run()