src/olympia/amo/models.py (372 lines of code) (raw):
import contextlib
import os
import time
from urllib.parse import urljoin
from django.conf import settings
from django.core.files.storage import default_storage as storage
from django.db import models
from django.db.models import Lookup
from django.db.models.expressions import Func
from django.db.models.fields import CharField, Field
from django.db.models.fields.related_descriptors import ManyToManyDescriptor
from django.db.models.query import ModelIterable
from django.urls import resolve, reverse
from django.urls.exceptions import Resolver404
from django.utils import timezone, translation
from django.utils.functional import cached_property
import multidb.pinning
import olympia.core.logger
from olympia.translations.hold import save_translations
log = olympia.core.logger.getLogger('z.addons')
@Field.register_lookup
class Like(Lookup):
lookup_name = 'like'
def as_sql(self, compiler, connection):
lhs_sql, params = self.process_lhs(compiler, connection)
rhs_sql, rhs_params = self.process_rhs(compiler, connection)
params.extend(rhs_params)
# This looks scarier than it is: rhs_sql should to resolve to '%s',
# lhs_sql to the query before this part. The params are isolated and
# will be passed to the database client code separately, ensuring
# everything is escaped correctly.
return '%s LIKE %s' % (lhs_sql, rhs_sql), params
@contextlib.contextmanager
def use_primary_db():
"""Within this context, all queries go to the master."""
old = getattr(multidb.pinning._locals, 'pinned', False)
multidb.pinning.pin_this_thread()
try:
yield
finally:
multidb.pinning._locals.pinned = old
class BaseQuerySet(models.QuerySet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._transform_fns = []
def _fetch_all(self):
if self._result_cache is None:
super()._fetch_all()
# At this point, _result_cache should have been filled up. If we
# are dealing with a "regular" queryset (not values() etc) then we
# call the transformers.
if issubclass(self._iterable_class, ModelIterable):
for func in self._transform_fns:
func(self._result_cache)
def _clone(self, **kwargs):
clone = super()._clone(**kwargs)
clone._transform_fns = self._transform_fns[:]
return clone
def transform(self, fn):
clone = self._clone()
clone._transform_fns.append(fn)
return clone
def pop_transforms(self):
qs = self._clone()
transforms = qs._transform_fns
qs._transform_fns = []
return transforms, qs
def no_transforms(self):
return self.pop_transforms()[1]
def only_translations(self):
"""Remove all transforms except translations."""
from olympia.translations import transformer
# Add an extra select so these are cached separately.
qs = self.no_transforms()
if hasattr(self.model._meta, 'translated_fields'):
qs = qs.transform(transformer.get_trans)
return qs
def optimized_count(self):
"""
Slightly optimized count() for cases where there is a DISTINCT in the
queryset.
When a count() call is made on a queryset that has a distinct, that
causes django to run the full SELECT (including all fields, distinct,
ordering etc) in a subquery and then COUNT() on the result of that
subquery, which is costly/innefficient. That's tracked in
https://code.djangoproject.com/ticket/30685.
We can't easily fix the fact that there is a subquery, but we can
avoid selecting all fields and ordering in that subquery needlessly.
"""
return self.values('pk').order_by().count()
class RawQuerySet(models.query.RawQuerySet):
"""A RawQuerySet with __len__."""
def __init__(self, raw_query, *args, **kw):
super().__init__(raw_query, *args, **kw)
self._result_cache = None
def __iter__(self):
if self._result_cache is None:
self._result_cache = list(super().__iter__())
return iter(self._result_cache)
def __len__(self):
return len(list(self.__iter__()))
class ManagerBase(models.Manager):
"""
Base for all managers in AMO.
Returns BaseQuerySets.
If a model has translated fields, they'll be attached through a transform
function.
"""
_queryset_class = BaseQuerySet
def get_queryset(self):
qs = self._queryset_class(self.model, using=self._db)
return self._with_translations(qs)
def _with_translations(self, qs):
from django.db.models import Value
from olympia.translations import transformer
if hasattr(self.model._meta, 'translated_fields'):
qs = qs.transform(transformer.get_trans)
# Annotate the queryset with the current language to prevent any
# caching of the query to share results across languages.
qs = qs.annotate(
__lang=Value(translation.get_language() or '', output_field=CharField())
)
return qs
def transform(self, fn):
return self.all().transform(fn)
def raw(self, raw_query, params=(), translations=None, using=None):
return RawQuerySet(
raw_query,
model=self.model,
params=params,
translations=translations,
using=using or self._db,
)
class _NoChangeInstance:
"""A proxy for object instances to make safe operations within an
OnChangeMixin.on_change() callback.
"""
def __init__(self, instance):
self.__instance = instance
def __repr__(self):
return f'<{self.__class__.__name__} for {self.__instance!r}>'
def __getattr__(self, attr):
return getattr(self.__instance, attr)
def __setattr__(self, attr, val):
if attr.endswith('__instance'):
# _NoChangeInstance__instance
self.__dict__[attr] = val
else:
setattr(self.__instance, attr, val)
def save(self, *args, **kw):
kw['_signal'] = False
return self.__instance.save(*args, **kw)
def update(self, *args, **kw):
kw['_signal'] = False
return self.__instance.update(*args, **kw)
_on_change_callbacks = {}
class OnChangeMixin:
"""Mixin for a Model that allows you to observe attribute changes.
Register change observers with::
class YourModel(amo.models.OnChangeMixin,
amo.models.ModelBase):
# ...
pass
YourModel.on_change(callback)
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self._reset_initial_attrs()
def _reset_initial_attrs(self, attrs=None):
if attrs is None:
self._initial_attrs = {
k: v
for k, v in self.__dict__.items()
if k not in ('_state', '_initial_attrs')
}
else:
self._initial_attrs.update(attrs)
@classmethod
def on_change(cls, callback):
"""Register a function to call on save or update to respond to changes.
For example::
def watch_status(old_attr=None, new_attr=None,
instance=None, sender=None, **kwargs):
if old_attr is None:
old_attr = {}
if new_attr is None:
new_attr = {}
if old_attr.get('status') != new_attr.get('status'):
# ...
new_instance.save(_signal=False)
TheModel.on_change(watch_status)
``old_atr`` will be a dict of the old instance attributes.
``new_attr`` will be a dict of the new instance attributes, including
any that had not been changed by the operation that triggered the
callback (such as an update only of one field).
.. note::
Any call to instance.save() or instance.update() within a callback
will not trigger any change handlers.
.. note::
Duplicates based on function.__name__ are ignored for a given
class.
"""
existing = _on_change_callbacks.get(cls, [])
if callback.__name__ in [e.__name__ for e in existing]:
return callback
_on_change_callbacks.setdefault(cls, []).append(callback)
return callback
def _send_changes(self, old_attr, new_attr_kw):
new_attr = old_attr.copy()
new_attr.update(new_attr_kw)
for cb in _on_change_callbacks[self.__class__]:
cb(
old_attr=old_attr,
new_attr=new_attr,
instance=_NoChangeInstance(self),
sender=self.__class__,
)
def save(self, *args, **kwargs):
"""
Save changes to the model instance.
If _signal=False is in `kw` the on_change() callbacks won't be called.
"""
# When saving an existing instance, if the caller didn't specify
# an explicit update_fields and _dynamic_update_fields is absent or
# True, we attempt to find out which fields were changed and only
# save those. This allows for slightly better performance as we don't
# keep re-saving the same data over and over again, but also avoids
# overwriting data that has changed in the meantime.
# Fields with auto_now=True will be included all the time.
#
# Note that deferred fields will be included in the list of changed
# fields if they are loaded afterwards, even if their value does not
# change.
if (
self.pk
# Just having self.pk is not enough, we only really want to catch
# UPDATE calls and the caller could be doing Model(pk=1).save().
# Django save() implementation uses the special _state attribute
# for this.
and self._state.adding is False
and kwargs.get('update_fields') is None
and kwargs.pop('_dynamic_update_fields', True)
):
fields = [f.attname for f in self._meta.concrete_fields]
concrete_initial_attrs = [
(k, v) for k, v in self._initial_attrs.items() if k in fields
]
current_attrs = [(k, self.__dict__[k]) for k, v in concrete_initial_attrs]
changed_attrs = (
set(current_attrs)
- set(concrete_initial_attrs)
# Never include primary key field - it might be set to None
# initially in _initial_attrs right after a call to create()
# even though self.pk is set.
- {(self._meta.pk.name, self.pk)}
)
auto_now_fields = [
f.name for f in self._meta.fields if getattr(f, 'auto_now', False)
]
kwargs['update_fields'] = [k for k, v in changed_attrs] + auto_now_fields
signal = kwargs.pop('_signal', True)
result = super().save(*args, **kwargs)
if signal and self.__class__ in _on_change_callbacks:
self._send_changes(self._initial_attrs.copy(), dict(self.__dict__))
# Reset initial_attr to be ready for the next save.
updated_fields = kwargs.get('update_fields')
self._reset_initial_attrs(
attrs={k: self.__dict__[k] for k in updated_fields}
if updated_fields
else None
)
return result
def update(self, **kwargs):
"""
Shortcut for doing an UPDATE on this object.
If _signal=False is in ``kwargs`` the post_save signal won't be sent.
"""
signal = kwargs.pop('_signal', True)
old_attr = dict(self.__dict__)
result = super().update(_signal=signal, **kwargs)
if signal and self.__class__ in _on_change_callbacks:
self._send_changes(old_attr, kwargs)
# Reset initial_attr to be ready for the next save. We only reset the
# fields we changed however, because the rest hasn't been saved yet.
# Otherwise doing obj.foo = 'bar' followed by obj.update(plop=42) and
# then obj.save() wouldn't save `foo`, because we'd reset the attrs
# used to compare in the .update() call.
self._reset_initial_attrs(attrs=kwargs)
return result
class SaveUpdateMixin:
def reload(self):
"""Reloads the instance from the database."""
from_db = self.__class__.get_unfiltered_manager().get(pk=self.pk)
for field in self.__class__._meta.fields:
try:
setattr(self, field.name, getattr(from_db, field.name))
except models.ObjectDoesNotExist:
# reload() can be called before cleaning up an object of stale
# related fields, when we do soft-deletion for instance. Avoid
# failing because of that.
pass
return self
@classmethod
def get_unfiltered_manager(cls):
"""Return the unfiltered manager from the given class."""
return getattr(cls, 'unfiltered', cls.objects) # Fallback on objects.
def update(self, **kw):
"""
Shortcut for doing an UPDATE on this object.
If _signal=False is in ``kw`` the post_save signal won't be sent.
"""
signal = kw.pop('_signal', True)
cls = self.__class__
for k, v in kw.items():
setattr(self, k, v)
if signal:
# Detect any attribute changes during pre_save and add those to the
# update kwargs.
attrs = dict(self.__dict__)
models.signals.pre_save.send(sender=cls, instance=self)
for k, v in self.__dict__.items():
if attrs[k] != v:
kw[k] = v
setattr(self, k, v)
# We want this to not fail mysteriously for filtered out objects (eg
# deleted or unlisted).
objects = cls.get_unfiltered_manager()
objects.filter(pk=self.pk).update(**kw)
if signal:
models.signals.post_save.send(sender=cls, instance=self, created=False)
def save(self, **kwargs):
# Unfortunately we have to save our translations before we call `save`
# since Django verifies m2n relations with unsaved parent relations
# and throws an error.
# https://docs.djangoproject.com/en/1.9/topics/db/examples/one_to_one/
if hasattr(self._meta, 'translated_fields'):
save_translations(self)
return super().save(**kwargs)
class ModelBase(SaveUpdateMixin, models.Model):
"""
Base class for AMO models to abstract some common features.
* Adds automatic created and modified fields to the model.
* Fetches all translations in one subsequent query during initialization.
"""
created = models.DateTimeField(default=timezone.now, editable=False, blank=True)
modified = models.DateTimeField(auto_now=True)
objects = ManagerBase()
class Meta:
abstract = True
get_latest_by = 'created'
# This is important: Setting this to `objects` makes sure
# that Django is using the manager set as `objects` on this
# instance reather than the `_default_manager` or even
# `_base_manager` that are by default configured by Django.
# That's the only way currently to reliably tell Django to resolve
# translation objects / call transformers.
# This also ensures we don't ignore soft-deleted items when traversing
# relations, if they are hidden by the objects manager, like we
# do with `addons.models:Addon`
base_manager_name = 'objects'
def get_absolute_url(self, *args, **kwargs):
relative_url = self.get_url_path(*args, **kwargs)
try:
func = resolve(relative_url).func
is_frontend = getattr(func, 'is_frontend_view', False)
except Resolver404:
is_frontend = False
site = settings.EXTERNAL_SITE_URL if is_frontend else settings.SITE_URL
return urljoin(site, relative_url)
def get_admin_url_path(self):
"""
Return the relative URL pointing to the instance admin change page.
"""
urlname = f'admin:{self._meta.app_label}_{self._meta.model_name}_change'
return reverse(urlname, args=(self.pk,))
def get_admin_absolute_url(self):
"""
Return the absolute URL pointing to the instance admin change page.
"""
return urljoin(settings.SITE_URL, self.get_admin_url_path())
def serializable_reference(self):
"""Return a tuple with app label, model name and pk to be used when we
need to pass a serializable reference to this instance without having
to serialize the whole object."""
return self._meta.app_label, self._meta.model_name, self.pk
def manual_order(qs, pks, pk_name='id'):
"""
Given a query set and a list of primary keys, return a set of objects from
the query set in that exact order.
"""
if not pks:
return qs.none()
return qs.filter(id__in=pks).extra(
select={'_manual': 'FIELD({}, {})'.format(pk_name, ','.join(map(str, pks)))},
order_by=['_manual'],
)
class SlugField(models.SlugField):
"""
Django 1.6's SlugField rejects non-ASCII slugs. This field just
keeps the old behaviour of not checking contents.
"""
default_validators = []
class FakeEmail(ModelBase):
message = models.TextField()
class Meta:
db_table = 'fake_email'
class BasePreview:
media_folder = 'previews'
def _image_url(self, folder, file_ext):
modified = int(time.mktime(self.modified.timetuple())) if self.modified else 0
url = '/'.join(
(
folder,
str(self.id // 1000),
f'{self.id}.{file_ext}?modified={modified}',
)
)
return f'{settings.MEDIA_URL}{self.media_folder}/{url}'
def _image_path(self, folder, file_ext):
url = os.path.join(
settings.MEDIA_ROOT,
self.media_folder,
folder,
str(self.id // 1000),
f'{self.id}.{file_ext}',
)
return url
@property
def thumbnail_url(self):
return self._image_url('thumbs', self.get_format('thumbnail'))
@property
def image_url(self):
return self._image_url('full', self.get_format('image'))
@property
def thumbnail_path(self):
return self._image_path('thumbs', self.get_format('thumbnail'))
@property
def image_path(self):
return self._image_path('full', self.get_format('image'))
@property
def original_path(self):
return self._image_path('original', self.get_format('original'))
@property
def thumbnail_dimensions(self):
return self.sizes.get('thumbnail', []) if self.sizes else []
@property
def image_dimensions(self):
return self.sizes.get('image', []) if self.sizes else []
def get_format(self, for_size):
return self.sizes.get(f'{for_size}_format', 'png')
@classmethod
def delete_preview_files(cls, sender, instance, **kw):
"""On delete of the Preview object from the database, unlink the image
and thumb on the file system"""
image_paths = [
instance.image_path,
instance.thumbnail_path,
instance.original_path,
]
for filename in image_paths:
try:
log.info(f'Removing filename: {filename} for preview: {instance.pk}')
storage.delete(filename)
except Exception as e:
log.error(f'Error deleting preview file ({filename}): {e}')
class LongNameIndex(models.Index):
"""Django's Index, but with a longer allowed name since we don't care about
compatibility with Oracle."""
max_name_length = 64 # Django default is 30, but MySQL can go up to 64.
class FilterableManyToManyDescriptor(ManyToManyDescriptor):
def __init__(self, *args, **kwargs):
self.q_filter = kwargs.pop('q_filter', None)
super().__init__(*args, **kwargs)
@classmethod
def _get_manager_with_default_filtering(cls, manager, q_filter):
"""This is wrapping the manager class so we can add an extra
filter to the queryset returned via get_queryset."""
class ManagerWithFiltering(manager):
def get_queryset(self):
# Check the queryset caching django uses during these lookups -
# we only want to add the q_filter the first time.
from_cache = self.prefetch_cache_name in getattr(
self.instance, '_prefetched_objects_cache', {}
)
qs = super().get_queryset()
if not from_cache and q_filter:
# Here is where we add the filter.
qs = qs.filter(q_filter)
return qs
return ManagerWithFiltering
@cached_property
def related_manager_cls(self):
cls = super().related_manager_cls
return self._get_manager_with_default_filtering(cls, self.q_filter)
class FilterableManyToManyField(models.fields.related.ManyToManyField):
"""This class builds on ManyToManyField to allow us to filter the relation
to a subset, similar to how we use the unfiltered manager to filter out
deleted instances of other foreign keys.
It takes an additional Q object arg (q_filter) which will be applied to the
queryset on *both* sides of the many-to-many relation. Because it's
applied to both sides the filter will typically be on the ManyToManyField
itself.
For example, class A and class B have a ManyToMany relation between them,
via class M (so M would have a foreign key to both A and B).
For an instance a of A, a.m would be:
`B.objects.filter(a__in=a.id, q_filter)`,
and for an instance b of B, b.m would be:
`A.objects.filter(b__in=b.id, q_filter)`.
If `q_filter` was `Q(m__deleted=False)` it would filter out all soft
deleted instances of M.
"""
def __init__(self, *args, **kwargs):
self.q_filter = kwargs.pop('q_filter', None)
super().__init__(*args, **kwargs)
def contribute_to_class(self, cls, name, **kwargs):
"""All we're doing here is overriding the `setattr` so it creates an
instance of FilterableManyToManyDescriptor rather than
ManyToManyDescriptor, and pass down the q_filter property."""
super().contribute_to_class(cls, name, **kwargs)
# Add the descriptor for the m2m relation.
setattr(
cls,
self.name,
FilterableManyToManyDescriptor(
self.remote_field, reverse=False, q_filter=self.q_filter
),
)
def contribute_to_related_class(self, cls, related):
"""All we're doing here is overriding the `setattr` so it creates an
instance of FilterableManyToManyDescriptor rather than
ManyToManyDescriptor, and pass down the q_filter property."""
super().contribute_to_related_class(cls, related)
if (
not self.remote_field.is_hidden()
and not related.related_model._meta.swapped
):
setattr(
cls,
related.get_accessor_name(),
FilterableManyToManyDescriptor(
self.remote_field, reverse=True, q_filter=self.q_filter
),
)
class GroupConcat(models.Aggregate):
function = 'GROUP_CONCAT'
allow_distinct = True
class Inet6Ntoa(Func):
function = 'INET6_NTOA'
output_field = CharField()