import datetime
from django.contrib.auth.models import User
from django.db import models
from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from oioioi.base.fields import EnumField, EnumRegistry
from oioioi.contests.date_registration import date_registry
from oioioi.contests.models import Contest
@date_registry.register('lock_date', name_generator=(lambda obj: _("Lock the forum")))
@date_registry.register(
'unlock_date', name_generator=(lambda obj: _("Unlock the forum"))
)
[docs]class Forum(models.Model):
"""Forum is connected with contest"""
[docs] contest = models.OneToOneField(Contest, on_delete=models.CASCADE)
[docs] visible = models.BooleanField(
default=True, verbose_name=_("forum is visible after lock")
)
[docs] lock_date = models.DateTimeField(
blank=True, null=True, verbose_name=_("autolock date")
)
[docs] unlock_date = models.DateTimeField(
blank=True, null=True, verbose_name=_("autounlock date")
)
[docs] def __str__(self):
return u'%(name)s' % {u'name': self.contest.name}
[docs] def is_autolocked(self, now=None):
"""Returns true if forum is locked"""
if not now:
now = timezone.now()
return bool(self.lock_date and now >= self.lock_date)
[docs] def is_autounlocked(self, now=None):
"""Returns true if forum was unlocked"""
if not now:
now = timezone.now()
return bool(self.unlock_date and now >= self.unlock_date)
[docs] def is_locked(self, now=None):
"""Returns true if forum is locked and not unlocked"""
return bool(self.is_autolocked(now) and not self.is_autounlocked(now))
[docs]class Category(models.Model):
"""Category model """
[docs] forum = models.ForeignKey(Forum, verbose_name=_("forum"), on_delete=models.CASCADE)
[docs] name = models.CharField(max_length=255, verbose_name=_("category"))
[docs] order = models.IntegerField(verbose_name=_("order"))
[docs] reactions_enabled = models.BooleanField(
default=False, verbose_name=_("reactions enabled")
)
[docs] def __str__(self):
return u"%s" % self.name
[docs] def count_threads(self):
return self.thread_set.count()
[docs] count_threads.short_description = _("Threads count")
[docs] def count_posts(self):
ret = 0
for t in self.thread_set.all():
ret += t.count_posts()
return ret
count_posts.short_description = _("Posts count")
[docs] def count_reported(self):
ret = 0
for t in self.thread_set.all():
ret += t.count_reported()
return ret
count_reported.short_description = _("Reported posts count")
[docs] def get_admin_url(self):
return reverse('oioioiadmin:forum_category_change', args=(self.id,))
[docs] def save(self, **kwargs):
if self.pk is None:
forum_categories = Category.objects.filter(forum__pk=self.forum_id)
if forum_categories.exists():
self.order = (
forum_categories.aggregate(models.Max("order"))["order__max"] + 1
)
else:
self.order = 0
super(Category, self).save(**kwargs)
[docs]class Thread(models.Model):
"""Thread model - topic in a category"""
[docs] category = models.ForeignKey(
Category, verbose_name=_("category"), on_delete=models.CASCADE
)
[docs] name = models.CharField(max_length=255, verbose_name=_("thread"))
[docs] last_post = models.ForeignKey(
'Post',
null=True,
on_delete=models.SET_NULL,
verbose_name=_("last post"),
related_name='last_post_of',
)
[docs] def __str__(self):
return u'%(name)s' % {u'name': self.name}
[docs] def count_posts(self):
return self.post_set.count()
[docs] count_posts.short_description = _("Posts count")
[docs] def count_reported(self):
# Although it may be done by:
# self.post_set.filter(reported=true).count()
# such solution produces O(|threads|) queries on a forum/category view.
# Moreover, it's not possible to prefetch them (like in count_posts):
# http://stackoverflow.com/a/12974801/2874777
return len([p for p in self.post_set.all() if p.reported])
count_reported.short_description = _("Reported posts count")
[docs] def get_admin_url(self):
return reverse('oioioiadmin:forum_thread_change', args=(self.id,))
[docs]class Post(models.Model):
"""Post - the basic part of the forum """
[docs] thread = models.ForeignKey(
Thread, verbose_name=_("thread"), on_delete=models.CASCADE
)
[docs] content = models.TextField(verbose_name=_("post"))
[docs] add_date = models.DateTimeField(
verbose_name=_("add date"), default=timezone.now, blank=True
)
[docs] last_edit_date = models.DateTimeField(
verbose_name=_("last edit"), blank=True, null=True
)
[docs] author = models.ForeignKey(User, verbose_name=_("author"), on_delete=models.CASCADE)
[docs] reported = models.BooleanField(verbose_name=_("reported"), default=False)
[docs] report_reason = models.TextField(
verbose_name=_("report_reason"), default="", blank=True
)
[docs] approved = models.BooleanField(verbose_name=_("approved"), default=False)
[docs] hidden = models.BooleanField(verbose_name=_("hidden"), default=False)
[docs] reported_by = models.ForeignKey(
User,
null=True,
related_name='%(class)s_user_reported',
on_delete=models.SET_NULL,
)
[docs] class PostsWithReactionsSummaryManager(models.Manager):
[docs] def get_queryset(self):
qs = super(Post.PostsWithReactionsSummaryManager, self).get_queryset()
for field_name, rtype in [
('upvotes_count', 'UPVOTE'),
('downvotes_count', 'DOWNVOTE'),
]:
# In Django >=2.0 it can can be simplified with Count(filter=Q(...))
reaction_count_agg = {
field_name: models.Sum(
models.Case(
models.When(reactions__type_of_reaction=rtype, then=1),
default=0,
output_field=models.IntegerField(),
)
)
}
qs = qs.annotate(**reaction_count_agg)
return qs
[docs] objects = PostsWithReactionsSummaryManager()
@property
[docs] def edited(self):
return bool(self.last_edit_date)
[docs] class Meta(object):
[docs] index_together = (('thread', 'add_date'),)
[docs] ordering = ('add_date',)
[docs] verbose_name = _("post")
[docs] verbose_name_plural = _("posts")
[docs] def __str__(self):
return u'%(content)s in %(thread)s' % {
u'content': self.content,
u'thread': self.thread,
}
[docs] def get_admin_url(self):
return reverse('oioioiadmin:forum_post_change', args=(self.id,))
[docs] def get_in_thread_url(self):
thread = self.thread
thread_url = reverse(
'forum_thread',
kwargs={
'contest_id': thread.category.forum.contest_id,
'category_id': thread.category_id,
'thread_id': thread.id,
},
)
post_url = '%s#forum-post-%d' % (thread_url, self.id)
return post_url
[docs] def can_be_removed(self):
return bool((timezone.now() - self.add_date) < datetime.timedelta(minutes=15))
[docs] def is_author_banned(self):
return Ban.is_banned(self.thread.category.forum, self.author)
[docs] def is_reporter_banned(self):
if not self.reported:
return False
return Ban.is_banned(self.thread.category.forum, self.reported_by)
[docs]post_reaction_types = EnumRegistry(
entries=[
('UPVOTE', _("Upvote")),
('DOWNVOTE', _("Downvote")),
]
)
[docs]class PostReaction(models.Model):
"""PostReaction - represents a reaction to a post on the forum."""
[docs] post = models.ForeignKey(
Post,
verbose_name=_("post"),
related_name='reactions',
on_delete=models.CASCADE,
)
[docs] author = models.ForeignKey(User, on_delete=models.CASCADE)
[docs] type_of_reaction = EnumField(post_reaction_types)
[docs]class Ban(models.Model):
"""Ban model - represents a ban on a forum. Banned person should not be
allowed any 'write' interaction with forum. This includes reporting
posts."""
[docs] user = models.ForeignKey(User, verbose_name=_("user"), on_delete=models.CASCADE)
[docs] forum = models.ForeignKey(Forum, verbose_name=_("forum"), on_delete=models.CASCADE)
[docs] admin = models.ForeignKey(
User,
verbose_name=_("admin who banned"),
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_forum_ban_set',
)
[docs] created_at = models.DateTimeField(
auto_now_add=True, editable=False, verbose_name=_("banned at")
)
[docs] reason = models.TextField(verbose_name=_("reason"))
@staticmethod
[docs] def is_banned(forum, user):
if user.is_anonymous:
return False
return Ban.objects.filter(forum=forum, user=user).exists()
[docs] def __str__(self):
return str(self.user)
@receiver(post_save, sender=Post)
[docs]def _set_as_new_last_post(sender, instance, created, **kwargs):
if created:
thread = instance.thread
thread.last_post = instance
thread.save()
@receiver(post_delete, sender=Post)
[docs]def _update_last_post(sender, instance, **kwargs):
try:
thread = instance.thread
except Thread.DoesNotExist:
# This may happen during cascade model deleting
return
try:
thread.last_post = thread.post_set.latest('id')
except Post.DoesNotExist:
thread.last_post = None
thread.save()
@receiver(pre_save, sender=Post)
[docs]def _remove_reports_if_approved(sender, instance, **kwargs):
if instance.approved:
instance.reported = False
instance.reported_by = None