opbeans/tasks.py (37 lines of code) (raw):

import random from django.core.cache import cache from django.db import models as m import elasticapm from elasticsearch import TransportError from elasticsearch.helpers import bulk from elasticsearch_dsl import Search from elasticsearch_dsl.connections import connections from opbeans import utils, models, documents from opbeans.celery import app @app.task() def update_stats(): if random.random() > 0.8: dict_for_truncation = {k: k for k in range(500)} assert False, "Bad luck!" elasticapm.label(a="x", b="y") elasticapm.set_custom_context({"a": "x", "b": "y"}) cache.set(utils.stats.cache_key, utils.stats(), 60) @app.task() def sync_customers(): customer_docs = [] for customer in models.Customer.objects.annotate(total_orders=m.Count('orders')).order_by('pk')[50:]: customer_docs.append(documents.Customer(**customer.to_search()).to_dict(include_meta=True)) bulk(connections.get_connection(), customer_docs) @app.task() def sync_orders(): highest_id = None try: r = Search(index='py-orders').sort('-_id')[0].execute() highest_id = int(r.hits[0].meta.id) except TransportError as e: if e.status_code == 404: highest_id = 0 order_docs = [] for order in models.Order.objects.filter(id__gt=highest_id).prefetch_related('customer'): order_docs.append(documents.Order(**order.to_search()).to_dict(include_meta=True)) bulk(connections.get_connection(), order_docs)