diff --git a/backend/dvadmin/utils/models.py b/backend/dvadmin/utils/models.py index 1283351c61e8a18131f6de61e565098f2e6e21dc..768f3d6a44b7dfce6c48d4457acfe26936ee9490 100644 --- a/backend/dvadmin/utils/models.py +++ b/backend/dvadmin/utils/models.py @@ -8,121 +8,259 @@ """ from datetime import datetime from importlib import import_module +import uuid from application import settings from django.apps import apps from django.conf import settings from django.db import models +from django.db import transaction +from django.db.models import functions +from django.utils import timezone from rest_framework.request import Request table_prefix = settings.TABLE_PREFIX # 数据库表名前缀 class SoftDeleteQuerySet(models.QuerySet): - pass + """ + 软删除QuerySet + """ + def delete(self): + """批量软删除""" + kwargs = { + field.name: functions.Concat( + functions.Coalesce(models.F(field.name), models.Value("")), + models.Value(f"__deleted_{uuid.uuid4().hex[:8]}"), + ) + for field in self.model._meta.fields + if field.unique and not field.primary_key and not field.is_relation + } + kwargs["is_deleted"] = True + kwargs["deleted_at"] = timezone.now() + return self.update(**kwargs) + + def hard_delete(self): + """批量物理删除""" + return super().delete() + + def restore(self): + """批量恢复""" + kwargs = { + field.name: functions.Substr( + models.F(field.name), + 1, + functions.Length(models.F(field.name)) - 18, + output_field=field.__class__(), + ) + for field in self.model._meta.fields + if field.unique and not field.primary_key and not field.is_relation + } + kwargs["is_deleted"] = False + kwargs["deleted_at"] = None + return self.update(**kwargs) -class SoftDeleteManager(models.Manager): - """支持软删除""" + def alive(self): + """只返回未删除的对象""" + return self.filter(is_deleted=False) - def __init__(self, *args, **kwargs): - self.__add_is_del_filter = False - super(SoftDeleteManager, self).__init__(*args, **kwargs) + def dead(self): + """只返回已删除的对象""" + return self.filter(is_deleted=True) - def filter(self, *args, **kwargs): - # 考虑是否主动传入is_deleted - if not kwargs.get('is_deleted') is None: - self.__add_is_del_filter = True - return super(SoftDeleteManager, self).filter(*args, **kwargs) + def all_objects(self): + """返回所有对象(包括已删除)""" + return self + + +class SoftDeleteManager(models.Manager): + """ + 软删除管理器 + """ def get_queryset(self): - if self.__add_is_del_filter: - return SoftDeleteQuerySet(self.model, using=self._db).exclude(is_deleted=False) - return SoftDeleteQuerySet(self.model).exclude(is_deleted=True) + """默认只返回未删除对象""" + return SoftDeleteQuerySet(self.model).filter(is_deleted=False) + + def all_objects(self): + """返回所有对象(包含已删除)""" + return SoftDeleteQuerySet(self.model) + + def deleted(self): + """只返回已删除对象""" + return self.all_objects().filter(is_deleted=True) def get_by_natural_key(self, name): - return SoftDeleteQuerySet(self.model).get(username=name) + """按自然键获取对象(只获取未删除的)""" + return self.get(username=name) class SoftDeleteModel(models.Model): - """ - 软删除模型 - 一旦继承,就将开启软删除 - """ - is_deleted = models.BooleanField(verbose_name="是否软删除", help_text='是否软删除', default=False, db_index=True) + is_deleted = models.BooleanField( + verbose_name="是否软删除", help_text="是否软删除", default=False, db_index=True + ) + deleted_at = models.DateTimeField( + verbose_name="删除时间", null=True, blank=True, db_index=True + ) objects = SoftDeleteManager() class Meta: abstract = True - verbose_name = '软删除模型' - verbose_name_plural = verbose_name - def delete(self, using=None, soft_delete=True, *args, **kwargs): + def delete(self, using=None, soft_delete=True, cascade=True): """ - 重写删除方法,直接开启软删除 + 重写删除方法 + :param soft_delete: 是否软删除 (默认为True) """ if soft_delete: - self.is_deleted = True - self.save(using=using) - # 级联软删除关联对象 - for related_object in self._meta.related_objects: - related_model = getattr(self, related_object.get_accessor_name()) - # 处理一对多和多对多的关联对象 - if related_object.one_to_many or related_object.many_to_many: - related_objects = related_model.all() - elif related_object.one_to_one: - related_objects = [related_model] - else: - continue - - for obj in related_objects: - obj.delete(soft_delete=True) + with transaction.atomic(): + self.is_deleted = True + self.deleted_at = timezone.now() + + update_fields = ["is_deleted", "deleted_at"] + if hasattr(self, "update_datetime"): + self.update_datetime = timezone.now() + update_fields.append("update_datetime") + + for field in self._meta.fields: + if field.unique and not field.primary_key and not field.is_relation: + new_value = f"{getattr(self, field.name)}__deleted_{uuid.uuid4().hex[:8]}" + if getattr(field, "max_length", 0) < len(new_value): + raise ValueError(f"字段 {field.name} 的长度小于新值的长度") + setattr( + self, + field.name, + new_value, + ) + update_fields.append(field.name) + + self.save(update_fields=update_fields) + + if cascade: + self._cascade_soft_delete() else: - super().delete(using=using, *args, **kwargs) + super().delete(using=using) + def _cascade_soft_delete(self): + """ + 级联软删除关联对象 + - 只处理设置了 on_delete=models.CASCADE 的关系 + - 跳过多对多关系(只解除关系,不删除对象) + """ + for related in self._meta.related_objects: + if related.on_delete != models.CASCADE: + continue + + if related.many_to_many: + continue + + accessor_name = related.get_accessor_name() + + if not hasattr(self, accessor_name): + continue + related_manager = getattr(self, accessor_name) + + if related.many_to_one: + related_manager.delete() + elif related.many_to_many: + related_manager.all().delete() + + def restore(self, cascade=True): + """ + 恢复软删除的对象 + """ + if not self.is_deleted: + return + with transaction.atomic(): + self.is_deleted = False + self.deleted_at = None + + update_fields = ["is_deleted", "deleted_at"] + if hasattr(self, "update_datetime"): + self.update_datetime = timezone.now() + update_fields.append("update_datetime") + + for field in self._meta.fields: + if field.unique and not field.primary_key: + original_value = getattr(self, field.name) + if original_value[:-8].endswith("__deleted_"): + setattr(self, field.name, original_value[:-18]) + update_fields.append(field.name) + + self.save(update_fields=update_fields) + if cascade: + self._cascade_restore() + + def _cascade_restore(self): + """ + 级联恢复软删除的对象 + """ + for related in self._meta.related_objects: + if related.on_delete != models.CASCADE: + continue + if related.many_to_many: + continue + + accessor_name = related.get_accessor_name() + + if not hasattr(self, accessor_name): + continue + related_manager = getattr(self, accessor_name) + + if related.many_to_one: + related_manager.restore() + elif related.many_to_many: + related_manager.all().restore() -class CoreModelManager(models.Manager): - def get_queryset(self): - is_deleted = getattr(self.model, 'is_soft_delete', False) - flow_work_status = getattr(self.model, 'flow_work_status', False) - queryset = super().get_queryset() - if flow_work_status: - queryset = queryset.filter(flow_work_status=1) - if is_deleted: - queryset = queryset.filter(is_deleted=False) - return queryset - def create(self,request: Request=None, **kwargs): - data = {**kwargs} - if request: - request_user = request.user - data["creator"] = request_user - data["modifier"] = request_user.id - data["dept_belong_id"] = request_user.dept_id - # 调用父类的create方法执行实际的创建操作 - return super().create(**data) class CoreModel(models.Model): """ 核心标准抽象模型模型,可直接继承使用 增加审计字段, 覆盖字段时, 字段名称请勿修改, 必须统一审计字段名称 """ + id = models.BigAutoField(primary_key=True, help_text="Id", verbose_name="Id") - description = models.CharField(max_length=255, verbose_name="描述", null=True, blank=True, help_text="描述") - creator = models.ForeignKey(to=settings.AUTH_USER_MODEL, related_query_name='creator_query', null=True, - verbose_name='创建人', help_text="创建人", on_delete=models.SET_NULL, - db_constraint=False) - modifier = models.CharField(max_length=255, null=True, blank=True, help_text="修改人", verbose_name="修改人") - dept_belong_id = models.CharField(max_length=255, help_text="数据归属部门", null=True, blank=True, - verbose_name="数据归属部门") - update_datetime = models.DateTimeField(auto_now=True, null=True, blank=True, help_text="修改时间", - verbose_name="修改时间") - create_datetime = models.DateTimeField(auto_now_add=True, null=True, blank=True, help_text="创建时间", - verbose_name="创建时间") - objects = CoreModelManager() - all_objects = models.Manager() + description = models.CharField( + max_length=255, verbose_name="描述", null=True, blank=True, help_text="描述" + ) + creator = models.ForeignKey( + to=settings.AUTH_USER_MODEL, + related_query_name="creator_query", + null=True, + verbose_name="创建人", + help_text="创建人", + on_delete=models.SET_NULL, + db_constraint=False, + ) + modifier = models.CharField( + max_length=255, null=True, blank=True, help_text="修改人", verbose_name="修改人" + ) + dept_belong_id = models.CharField( + max_length=255, + help_text="数据归属部门", + null=True, + blank=True, + verbose_name="数据归属部门", + ) + update_datetime = models.DateTimeField( + auto_now=True, + null=True, + blank=True, + help_text="修改时间", + verbose_name="修改时间", + ) + create_datetime = models.DateTimeField( + auto_now_add=True, + null=True, + blank=True, + help_text="创建时间", + verbose_name="创建时间", + ) + class Meta: abstract = True - verbose_name = '核心模型' + verbose_name = "核心模型" verbose_name_plural = verbose_name def get_request_user(self, request: Request): @@ -147,32 +285,32 @@ class CoreModel(models.Model): def common_insert_data(self, request: Request): data = { - 'create_datetime': datetime.now(), - 'creator': self.get_request_user(request) + "create_datetime": datetime.now(), + "creator": self.get_request_user(request), } return {**data, **self.common_update_data(request)} def common_update_data(self, request: Request): return { - 'update_datetime': datetime.now(), - 'modifier': self.get_request_user_username(request) + "update_datetime": datetime.now(), + "modifier": self.get_request_user_username(request), } exclude_fields = [ - '_state', - 'pk', - 'id', - 'create_datetime', - 'update_datetime', - 'creator', - 'creator_id', - 'creator_pk', - 'creator_name', - 'modifier', - 'modifier_id', - 'modifier_pk', - 'modifier_name', - 'dept_belong_id', + "_state", + "pk", + "id", + "create_datetime", + "update_datetime", + "creator", + "creator_id", + "creator_pk", + "creator_name", + "modifier", + "modifier_id", + "modifier_pk", + "modifier_name", + "dept_belong_id", ] def get_exclude_fields(self): @@ -185,15 +323,22 @@ class CoreModel(models.Model): return [field.name for field in self.get_all_fields()] def get_need_fields_names(self): - return [field.name for field in self.get_all_fields() if field.name not in self.exclude_fields] + return [ + field.name + for field in self.get_all_fields() + if field.name not in self.exclude_fields + ] def to_data(self): - """将模型转化为字典(去除不包含字段)(注意与to_dict_data区分)。 - """ + """将模型转化为字典(去除不包含字段)(注意与to_dict_data区分)。""" res = {} for field in self.get_need_fields_names(): field_value = getattr(self, field) - res[field] = field_value.id if (issubclass(field_value.__class__, CoreModel)) else field_value + res[field] = ( + field_value.id + if (issubclass(field_value.__class__, CoreModel)) + else field_value + ) return res @property @@ -201,8 +346,7 @@ class CoreModel(models.Model): return self.to_data() def to_dict_data(self): - """需要导出的字段(去除不包含字段)(注意与to_data区分) - """ + """需要导出的字段(去除不包含字段)(注意与to_data区分)""" return {field: getattr(self, field) for field in self.get_need_fields_names()} @property @@ -210,20 +354,20 @@ class CoreModel(models.Model): return self.to_dict_data() def insert(self, request): - """插入模型 - """ - assert self.pk is None, f'模型{self.__class__.__name__}还没有保存到数据中,不能手动指定ID' + """插入模型""" + assert ( + self.pk is None + ), f"模型{self.__class__.__name__}还没有保存到数据中,不能手动指定ID" validated_data = {**self.common_insert_data(request), **self.DICT_DATA} return self.__class__._default_manager.create(**validated_data) def update(self, request, update_data: dict[str, any] = None): - """更新模型 - """ - assert isinstance(update_data, dict), 'update_data必须为字典' + """更新模型""" + assert isinstance(update_data, dict), "update_data必须为字典" validated_data = {**self.common_insert_data(request), **update_data} for key, value in validated_data.items(): # 不允许修改id,pk,uuid字段 - if key in ['id', 'pk', 'uuid']: + if key in ["id", "pk", "uuid"]: continue if hasattr(self, key): setattr(self, key, value) @@ -240,11 +384,17 @@ def get_all_models_objects(model_name=None): if not settings.ALL_MODELS_OBJECTS: all_models = apps.get_models() for item in list(all_models): - table = {"tableName": item._meta.verbose_name, "table": item.__name__, "tableFields": []} + table = { + "tableName": item._meta.verbose_name, + "table": item.__name__, + "tableFields": [], + } for field in item._meta.fields: fields = {"title": field.verbose_name, "field": field.name} - table['tableFields'].append(fields) - settings.ALL_MODELS_OBJECTS.setdefault(item.__name__, {"table": table, "object": item}) + table["tableFields"].append(fields) + settings.ALL_MODELS_OBJECTS.setdefault( + item.__name__, {"table": table, "object": item} + ) if model_name: return settings.ALL_MODELS_OBJECTS[model_name] or {} return settings.ALL_MODELS_OBJECTS or {} @@ -252,21 +402,33 @@ def get_all_models_objects(model_name=None): def get_model_from_app(app_name): """获取模型里的字段""" - model_module = import_module(app_name + '.models') - exclude_models = getattr(model_module, 'exclude_models', []) + model_module = import_module(app_name + ".models") + exclude_models = getattr(model_module, "exclude_models", []) filter_model = [ - value for key, value in model_module.__dict__.items() - if key != 'CoreModel' + value + for key, value in model_module.__dict__.items() + if key != "CoreModel" and isinstance(value, type) and issubclass(value, models.Model) and key not in exclude_models ] model_list = [] for model in filter_model: - if model.__name__ == 'AbstractUser': + if model.__name__ == "AbstractUser": continue - fields = [{'title': field.verbose_name, 'name': field.name, 'object': field} for field in model._meta.fields] - model_list.append({'app': app_name, 'verbose': model._meta.verbose_name, 'model': model.__name__, 'object': model, 'fields': fields}) + fields = [ + {"title": field.verbose_name, "name": field.name, "object": field} + for field in model._meta.fields + ] + model_list.append( + { + "app": app_name, + "verbose": model._meta.verbose_name, + "model": model.__name__, + "object": model, + "fields": fields, + } + ) return model_list @@ -279,7 +441,7 @@ def get_custom_app_models(app_name=None): all_apps = apps.get_app_configs() res = [] for app in all_apps: - if app.name.startswith('django'): + if app.name.startswith("django"): continue if app.name in settings.COLUMN_EXCLUDE_APPS: continue