Source code for salesman.basket.models

from collections import OrderedDict
from decimal import Decimal
from typing import Optional, Tuple

from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.http import HttpRequest
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _

from salesman.core.models import JSONField

BASKET_ID_SESSION_KEY = 'BASKET_ID'


[docs]class BasketManager(models.Manager):
[docs] def get_or_create_from_request(self, request: HttpRequest) -> Tuple['Basket', bool]: """ Get basket from request or create a new one. If user is logged in session basket gets merged into a user basket. Returns: tuple: (basket, created) """ if not hasattr(request, 'session'): request.session = {} try: session_basket_id = request.session[BASKET_ID_SESSION_KEY] session_basket = self.get(id=session_basket_id, owner=None) except (KeyError, Basket.DoesNotExist): session_basket = None if hasattr(request, 'user') and request.user.is_authenticated: try: basket, created = self.get_or_create(owner=request.user) except self.model.MultipleObjectsReturned: # User has multiple baskets, merge them. baskets = list(self.filter(owner=request.user)) basket, created = baskets[0], False for other in baskets[1:]: basket.merge(other) if session_basket: # Merge session basket into user basket. basket.merge(session_basket) if BASKET_ID_SESSION_KEY in request.session: # Delete session basket id from session so that it doesn't get # re-fetched while user is still logged in. del request.session[BASKET_ID_SESSION_KEY] else: basket, created = session_basket or self.create(), not session_basket request.session[BASKET_ID_SESSION_KEY] = basket.id return basket, created
[docs]class Basket(models.Model): owner = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, verbose_name=_("Owner"), ) extra = JSONField(_("Extra"), blank=True) date_created = models.DateTimeField(_("Date created"), auto_now_add=True) date_updated = models.DateTimeField(_("Date updated"), auto_now=True) objects = BasketManager() _cached_items = None class Meta: verbose_name = _("Basket") verbose_name_plural = _("Baskets") ordering = ['-date_created'] def __str__(self): return str(self.pk) if self.pk else "(unsaved)" def __iter__(self): for item in self.items.all(): yield item
[docs] def update(self, request: HttpRequest) -> None: """ Process basket with modifiers defined in ``SALESMAN_BASKET_MODIFIERS``. This method sets ``subtotal``, ``total`` and ``extra_rows`` attributes on the basket and updates the items. Should be called every time the basket item is added, removed or updated or basket extra is updated. Args: request (HttpRequest): Django request """ from .modifiers import basket_modifiers_pool items = self.get_items() self.extra_rows = OrderedDict() self.subtotal = 0 for item in items: item.update(request) self.subtotal += item.total self.total = self.subtotal for modifier in basket_modifiers_pool.get_modifiers(): modifier.process_basket(self, request) self._cached_items = items
[docs] def add( self, product: object, quantity: int = 1, ref: Optional[str] = None, extra: Optional[dict] = None, ) -> 'BasketItem': """ Add product to the basket. Returns: BasketItem: BasketItem instance """ if not ref: ref = BasketItem.get_product_ref(product) try: item = self.items.get(ref=ref) item.quantity += quantity item.extra = extra or item.extra item.save(update_fields=['quantity', 'extra', 'date_updated']) except BasketItem.DoesNotExist: item = BasketItem.objects.create( basket=self, product=product, quantity=quantity, ref=ref, extra=extra or {}, ) self._cached_items = None return item
[docs] def remove(self, ref: str) -> None: """ Remove item with given ``ref`` from the basket. Args: ref (str): Item ref to remove. """ try: self.items.get(ref=ref).delete() self._cached_items = None except BasketItem.DoesNotExist: pass
[docs] def clear(self) -> None: """ Clear all items from the basket. """ self.items.all().delete() self._cached_items = None
[docs] def merge(self, other: 'Basket') -> None: """ Merge other basket with this one, delete afterwards. Args: other (Basket): Basket which to merge """ for item in other: try: existing = self.items.get(ref=item.ref) existing.quantity += item.quantity existing.save(update_fields=['quantity']) except item.DoesNotExist: item.basket = self item.save(update_fields=['basket']) other.delete() self._cached_items = None
[docs] def get_items(self) -> list: """ Returns items from cache or stores new ones. """ if self._cached_items is None: self._cached_items = list(self.items.all()) return self._cached_items
@property def count(self) -> int: """ Returns basket item count. """ if self._cached_items is not None: return len(self._cached_items) return self.items.count() @property def quantity(self) -> int: """ Returns the total quantity of all items in a basket. """ if self._cached_items is not None: return sum([item.quantity for item in self._cached_items]) aggr = self.items.aggregate(quantity=models.Sum('quantity')) return aggr['quantity'] or 0
[docs]class BasketItem(models.Model): basket = models.ForeignKey( Basket, on_delete=models.CASCADE, related_name='items', verbose_name=_("Basket"), ) # Reference to this basket item, used to determine item duplicates. ref = models.SlugField(_("Reference"), max_length=128) # Generic relation to product. product_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) product_id = models.PositiveIntegerField(_("Product id")) product = GenericForeignKey('product_content_type', 'product_id') quantity = models.PositiveIntegerField(_("Quantity"), default=1) extra = JSONField(_("Extra"), blank=True) date_created = models.DateTimeField(_("Date created"), auto_now_add=True) date_updated = models.DateTimeField(_("Date updated"), auto_now=True) class Meta: verbose_name = _("Item") verbose_name_plural = _("Items") unique_together = ('basket', 'ref') ordering = ['date_created'] def __str__(self): return f'{self.quantity}x {self.product}' def save(self, *args, **kwargs): # Set default ref. if not self.ref: self.ref = self.get_product_ref(self.product) super().save(*args, **kwargs)
[docs] def update(self, request: HttpRequest) -> None: """ Process items with modifiers defined in ``SALESMAN_BASKET_MODIFIERS``. This method sets ``unit_price``, ``subtotal``, ``total`` and ``extra_rows`` attributes on the item. Should be called every time the basket item is added, removed or updated. Args: request (HttpRequest): Django request """ from .modifiers import basket_modifiers_pool self.extra_rows = OrderedDict() self.unit_price = Decimal(self.product.get_price(request)) self.subtotal = self.unit_price * self.quantity self.total = self.subtotal for modifier in basket_modifiers_pool.get_modifiers(): modifier.process_item(self, request)
[docs] @classmethod def get_product_ref(cls, product: object) -> str: """ Returns default item ``ref`` for given product. Args: product (object): Product instance Returns: str: Item ref """ return slugify(f'{product._meta.label}-{product.id}')