Source code for pytoolbox.django.models.mixins

"""
Mix-ins for building your own models.

Recommended sub-classing order:

- BetterUniquenessErrorsMixin
- AutoForceInsertMixin
- CallFieldsPreSaveMixin
- AutoUpdateFieldsMixin
- AlwaysUpdateFieldsMixin
- AutoRemovePKFromUpdateFieldsMixin
- ValidateOnSaveMixin (will defeat the detection method of AutoUpdateFieldsMixin if put before it)
- UpdatePreconditionsMixin
- StateTransitionPreconditionMixin
- StateTransitionEventsMixin

Order for these does not matter:

- PublicMetaMixin
- RelatedModelMixin
- ReloadMixin
- SaveInstanceFilesMixin
"""

import collections, itertools, re

from django.core.exceptions import ValidationError
from django.db import DatabaseError
from django.db.models.fields.files import FileField
from django.db.utils import IntegrityError
from django.utils.functional import cached_property

from pytoolbox.django import signals
from pytoolbox.django.core import exceptions
from pytoolbox import itertools as py_itertools, module  # pylint:disable=reimported
from . import utils

_all = module.All(globals())


[docs]class AlwaysUpdateFieldsMixin(object): """ Ensure fields listed in the attribute ``self.always_update_fields`` are always updated by ``self.save()``. Makes the usage of ``self.save(update_fields=...)`` cleaner. """
[docs] def save(self, *args, **kwargs): update_fields = kwargs.get('update_fields') if kwargs.get('force_update') or update_fields: update_fields = set(update_fields or []) update_fields.update(self.always_update_fields) kwargs['update_fields'] = update_fields super().save(*args, **kwargs)
[docs]class AutoForceInsertMixin(object):
[docs] def save(self, *args, **kwargs): if kwargs.get('force_insert') is None: kwargs['force_insert'] = self._state.adding super().save(*args, **kwargs)
[docs]class AutoRemovePKFromUpdateFieldsMixin(object): """ If the primary key is set but unchanged, then remove the primary key from the list of fields to update. This fix an issue when saving, ``ValueError: The following fields do not exist in this model or are m2m fields: id.``. This check is probably implemented by Django developers to protect from unintentional data overwrite. If the primary key is set to a new value, then the intention of the developer is probably to duplicate the model by saving it with a new primary key. So the mix-in let Django save with its own default options for save. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.previous_pk = self.pk
[docs] def save(self, **kwargs): update_fields = kwargs.get('update_fields') if update_fields and self._meta.pk.attname in update_fields: if self.pk == self.previous_pk: kwargs['update_fields'] = {f for f in update_fields if f != self._meta.pk.attname} else: # This is probably the model duplication pattern for argument in 'force_insert', 'force_update', 'update_fields': kwargs.pop(argument, None) super().save(**kwargs) self.previous_pk = self.pk
[docs]class AutoUpdateFieldsMixin(object): """ Keep track of what fields were set in order to make UPDATE queries lighter. This mix-in comes with the following features: * Foreign keys and the mutable types are correctly handled. * Models with a primary key preset to a value before being saved in database are correctly handled. * You can specify the value for `force_update` if it is `None` with `default_force_update`. However this low-memory footprint mix-in also comes with some limitations, it does not: * Store old fields values - you cannot know if the fields are really modified or not. * Watch for background modifications of the mutable fields - it can drives you crazy, sometimes. * Detect fields updated by the field's pre_save - CallFieldsPreSaveMixin before this mix-in. * Filter the primary key from the list of fields to update - AutoRemovePKFromUpdateFieldsMixin after this mix-in. """ default_force_update = False
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._setted_fields = set() self._fields_names = frozenset(f.attname for f in self._meta.fields)
def __setattr__(self, name, value): # Check the instance is both initialized and already stored in DB if name in getattr(self, '_fields_names', set()) and not self._state.adding: self._setted_fields.add(name) return super().__setattr__(name, value)
[docs] def save(self, *args, **kwargs): if not self._state.adding and not kwargs.get('force_insert'): if kwargs.get('force_update') is None: kwargs['force_update'] = self.default_force_update if kwargs.get('update_fields') is None: kwargs['update_fields'] = set(self._setted_fields) super().save(*args, **kwargs) self._setted_fields = set()
[docs]class BetterUniquenessErrorsMixin(object): """ Hide some fields from the unique-together errors. Convert uniqueness integrity errors to validation errors. **Use cases** Case: Your model have some non editable fields that are included in a uniqueness constraint. Issue: The uniqueness errors shown in your forms includes the name of the hidden fields. Solution: Exclude the name of the hidden fields from the error messages. Implementation: Set `unique_together_hide_fields` to the name of those hidden fields. Case: Sometimes the form submit may raise an integrity error (concurrency, [yes] crazy unexpected usage of Django). Issue: Those integrity errors are unfortunately returned as Internal Server Error (HTTP 500). Solution: Convert the uniqueness integrity errors to validation errors and return an awesome form with errors. Implementation: Set `unique_from_integrity_error` to True. And subclass :class:`pytoolbox.django.views.mixins.ValidationErrorsMixin` in your edit views. """ unique_from_integrity_error = True unique_together_hide_fields = ()
[docs] def save(self, *args, **kwargs): try: super().save(*args, **kwargs) except IntegrityError as ex: if self.unique_from_integrity_error: match = re.search(r'duplicate key[^\)]+\((?P<fields>[^\)]+)\)', ex.args[0]) if match: fields = { f.strip().replace('_id', '') for f in match.groupdict()['fields'].split(',') } if fields in (set(u) for u in self._meta.unique_together): fields = sorted(fields - set(self.unique_together_hide_fields)) if fields: error = self.unique_error_message(self.__class__, fields) raise ValidationError({fields[0]: error}) if len(fields) == 1 else error return self._handle_hidden_duplicate_key_error(ex) raise
def _handle_hidden_duplicate_key_error(self, ex): raise ex def _perform_unique_checks(self, unique_checks): errors_by_field = super()._perform_unique_checks(unique_checks) hidden_fields = set(self.unique_together_hide_fields) if not hidden_fields: return errors_by_field filtered_errors_by_field = collections.defaultdict(list) for field, errors in errors_by_field.items(): for error in errors: # only process the uniqueness errors related to multiple fields if len(error.params.get('unique_check', [])) > 1: fields = [f for f in error.params['unique_check'] if f not in hidden_fields] if fields: error = self.unique_error_message(self.__class__, fields) filtered_errors_by_field[ fields[0] if len(fields) == 1 else field ].append(error) else: filtered_errors_by_field[field].append(error) return filtered_errors_by_field
[docs]class CallFieldsPreSaveMixin(object): """ If you wanna be sure the fields pre_save method are called, now you can! For more information see: https://code.djangoproject.com/ticket/25363 """
[docs] def save(self, *args, **kwargs): non_pk_fields = (f for f in self._meta.local_concrete_fields if not f.primary_key) for field in non_pk_fields: field.pre_save(self, self._state.adding) super().save(*args, **kwargs)
[docs]class PublicMetaMixin(object): """ Make `_meta` public in templates through a class method called `meta`. """
[docs] @classmethod def meta(cls): return cls._meta
[docs]class RelatedModelMixin(object):
[docs]class ReloadMixin(object):
[docs] def reload(self): return self._meta.model._default_manager.get(pk=self.pk)
[docs]class SaveInstanceFilesMixin(object): """ Overrides saves() with a method that saves the instance first and then the instance's file fields this ensure that the upload_path method will get a valid instance id / private key. """
[docs] def save(self, *args, **kwargs): saved_fields = {} if self.pk is None: for field in self._meta.fields: if isinstance(field, FileField): saved_fields[field.name] = getattr(self, field.name) setattr(self, field.name, None) super().save(*args, **kwargs) for name, value in saved_fields.items(): setattr(self, name, value) kwargs['force_insert'] = False # Do not force because we already saved the instance super().save(*args, **kwargs)
[docs]class UpdatePreconditionsMixin(object): precondition_error_class = exceptions.DatabaseUpdatePreconditionsError
[docs] def apply_preconditions(self, base_qs, using, pk_val, values, update_fields, force_update): if hasattr(self, '_preconditions'): pre_excludes, pre_filters = self._preconditions del self._preconditions if pre_excludes: base_qs = base_qs.exclude(**pre_excludes) if pre_filters: base_qs = base_qs.filter(**pre_filters) return base_qs, using, pk_val, values, update_fields, force_update
[docs] def pop_preconditions(self, *args, **kwargs): self._preconditions = kwargs.pop('pre_excludes', {}), kwargs.pop('pre_filters', {}) return args, kwargs, any(self._preconditions)
[docs] def save(self, *args, **kwargs): args, kwargs, has_preconditions = self.pop_preconditions(*args, **kwargs) try: super().save(*args, **kwargs) except DatabaseError as ex: if has_preconditions and 'did not affect' in str(ex): raise self.precondition_error_class() raise
def _do_update(self, base_qs, using, pk_val, values, update_fields, force_update): # FIXME _do_update is called once for each model in the inheritance hierarchy: Handle this! args = self.apply_preconditions(base_qs, using, pk_val, values, update_fields, force_update) updated = super()._do_update(*args) if not updated and args[0] != base_qs and base_qs.filter(pk=pk_val).exists(): raise self.precondition_error_class() return updated
[docs]class StateTransitionEventsMixin(object):
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.previous_state = self.state
[docs] def on_post_state_transition(self, args, kwargs): signals.post_state_transition.send( instance=self, previous_state=self.previous_state, args=args, kwargs=kwargs)
[docs] def save(self, *args, **kwargs): super().save(*args, **kwargs) if 'state' in kwargs.get('update_fields', ['state']): self.on_post_state_transition(args, kwargs) self.previous_state = self.state
[docs]class StateTransitionPreconditionMixin(UpdatePreconditionsMixin): check_state = True invalid_state_error_class = exceptions.InvalidStateError transition_not_allowed_error_class = exceptions.TransitionNotAllowedError
[docs] def can_transit_to(self, state, fail=False, noop_skip=False): """ Helper that return the following: * True if transition to `state` is allowed. * False if `fail` is set to False or `noop_skip` is set to True and state is unchanged. * Else raise a transition not allowed error. """ if state in self.states.TRANSITIONS[self.state]: return True if not fail or noop_skip and self.state == state: return False raise self.transition_not_allowed_error_class(instance=self, state=state)
[docs] def check_state_in(self, states, fail=False): states = sorted(py_itertools.chain(states)) if self.state in states: return True if not fail: return False raise self.invalid_state_error_class(instance=self, states=states)
[docs] def pop_preconditions(self, *args, **kwargs): """ Add state precondition if state will be saved and state is not enforced by preconditions. """ args, kwargs, _ = super().pop_preconditions(*args, **kwargs) if ( kwargs.pop('check_state', self.check_state) and 'state' in kwargs.get('update_fields', ['state']) ): pre_excludes, pre_filters = self._preconditions if not any(f.startswith('state') for f in itertools.chain(pre_excludes, pre_filters)): states, valid = self.states.get_transit_from(self.state, auto_inverse=True) assert states, (states, valid) key, values = ( ('state__in', states) if len(states) > 1 else ('state', next(iter(states))) ) (pre_filters if valid else pre_excludes)[key] = values return args, kwargs, any(self._preconditions)
[docs]class ValidateOnSaveMixin(object): validate_on_save = True validate_on_save_kwargs = {}
[docs] def save(self, *args, **kwargs): if kwargs.pop('validate', self.validate_on_save): self.full_clean(**self.validate_on_save_kwargs) super().save(*args, **kwargs)
[docs]class FasterValidateOnSaveMixin(ValidateOnSaveMixin): """ Do not validate uniqueness nor relation fields on save to prevent excessive SELECT queries. """ @cached_property def validate_on_save_kwargs(self): return { 'exclude': [f.name for f in self._meta.concrete_fields if f.rel], 'validate_unique': False }
__all__ = _all.diff(globals())