Files
c/backend/app/models.py
刘正航 cedfd066c4 feat: 垃圾信息分类标签功能
新增垃圾信息细分类标签,在朴素贝叶斯二分类基础上对spam进行细分:
- 新增 spam_categorizer.py 分类模块(诈骗/骚扰/广告)
- SpamPredictionLog 和 ContentPost 模型添加 category 字段
- content_routes 和 spam_routes 接口返回分类标签

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-22 21:52:08 +08:00

191 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from werkzeug.security import check_password_hash, generate_password_hash
from app.extensions import db
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
nickname = db.Column(db.String(64), nullable=False)
company = db.Column(db.String(128), default="")
title = db.Column(db.String(64), default="")
phone = db.Column(db.String(32), default="")
is_admin = db.Column(db.Boolean, default=False)
credit_score = db.Column(db.Integer, default=100) # 信誉分 0-200默认100
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
prediction_logs = db.relationship("SpamPredictionLog", backref="user", lazy=True, cascade="all, delete-orphan")
training_samples = db.relationship("SpamTrainingSample", backref="creator", lazy=True, foreign_keys="SpamTrainingSample.created_by")
sent_posts = db.relationship("ContentPost", backref="author", lazy=True, foreign_keys="ContentPost.user_id")
received_posts = db.relationship("ContentPost", backref="recipient", lazy=True, foreign_keys="ContentPost.recipient_user_id")
reviewed_posts = db.relationship("ContentPost", backref="reviewer", lazy=True, foreign_keys="ContentPost.manual_review_by")
def set_password(self, password: str) -> None:
self.password_hash = generate_password_hash(password)
def check_password(self, password: str) -> bool:
return check_password_hash(self.password_hash, password)
def to_dict(self) -> dict:
return {
"id": self.id,
"username": self.username,
"nickname": self.nickname,
"company": self.company,
"title": self.title,
"phone": self.phone,
"is_admin": self.is_admin,
"credit_score": self.credit_score,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class SpamTrainingSample(db.Model):
__tablename__ = "spam_training_samples"
id = db.Column(db.Integer, primary_key=True)
text = db.Column(db.Text, nullable=False)
label = db.Column(db.String(16), nullable=False, index=True) # spam | ham
source = db.Column(db.String(32), default="seed") # seed | import | feedback | manual_review
created_by = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True, index=True)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self) -> dict:
return {
"id": self.id,
"text": self.text,
"label": self.label,
"source": self.source,
"created_by": self.created_by,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class SpamPredictionLog(db.Model):
__tablename__ = "spam_prediction_logs"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
text = db.Column(db.Text, nullable=False)
prediction = db.Column(db.String(16), nullable=False) # spam | ham
category = db.Column(db.String(32), default="") # fraud | harassment | advertisement | spam | 空
spam_probability = db.Column(db.Float, nullable=False)
ham_probability = db.Column(db.Float, nullable=False)
confidence = db.Column(db.Float, nullable=False)
reason_tokens = db.Column(db.JSON, default=list)
model_version = db.Column(db.String(64), default="")
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
def to_dict(self) -> dict:
return {
"id": self.id,
"user_id": self.user_id,
"text": self.text,
"prediction": self.prediction,
"category": self.category or "",
"spam_probability": round(float(self.spam_probability), 4),
"ham_probability": round(float(self.ham_probability), 4),
"confidence": round(float(self.confidence), 4),
"reason_tokens": self.reason_tokens or [],
"model_version": self.model_version,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class DetectionConfig(db.Model):
__tablename__ = "detection_configs"
id = db.Column(db.Integer, primary_key=True)
spam_threshold = db.Column(db.Float, nullable=False, default=0.75)
updated_by = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self) -> dict:
return {
"id": self.id,
"spam_threshold": round(float(self.spam_threshold), 4),
"updated_by": self.updated_by,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class ContentPost(db.Model):
__tablename__ = "content_posts"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
recipient_user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True, index=True)
text = db.Column(db.Text, nullable=False)
visibility = db.Column(db.String(16), nullable=False, default="public") # public | private | direct
status = db.Column(db.String(16), nullable=False, default="published") # published | blocked
prediction = db.Column(db.String(16), nullable=False, default="ham")
category = db.Column(db.String(32), default="") # fraud | harassment | advertisement | spam | 空
spam_probability = db.Column(db.Float, nullable=False, default=0)
ham_probability = db.Column(db.Float, nullable=False, default=0)
confidence = db.Column(db.Float, nullable=False, default=0)
threshold = db.Column(db.Float, nullable=False, default=0.75)
reason_tokens = db.Column(db.JSON, default=list)
model_version = db.Column(db.String(64), default="")
manual_review_status = db.Column(db.String(32), nullable=False, default="none") # none | pending | confirmed_spam | approved_ham
manual_review_by = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
manual_review_note = db.Column(db.String(255), default="")
manual_review_at = db.Column(db.DateTime, nullable=True)
appeal_status = db.Column(db.String(16), nullable=False, default="none") # none | pending | approved | rejected
appeal_reason_type = db.Column(db.String(32), default="") # 快捷理由类型
appeal_reason = db.Column(db.String(255), default="")
appeal_evidence_urls = db.Column(db.JSON, default=list) # 证据图片 URL 列表
appeal_admin_note = db.Column(db.String(255), default="")
appeal_submitted_at = db.Column(db.DateTime, nullable=True)
appeal_processed_at = db.Column(db.DateTime, nullable=True)
appeal_processed_by = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self) -> dict:
return {
"id": self.id,
"user_id": self.user_id,
"recipient_user_id": self.recipient_user_id,
"text": self.text,
"visibility": self.visibility,
"status": self.status,
"prediction": self.prediction,
"category": self.category or "",
"spam_probability": round(float(self.spam_probability), 4),
"ham_probability": round(float(self.ham_probability), 4),
"confidence": round(float(self.confidence), 4),
"threshold": round(float(self.threshold), 4),
"reason_tokens": self.reason_tokens or [],
"model_version": self.model_version,
"manual_review_status": self.manual_review_status,
"manual_review_by": self.manual_review_by,
"manual_review_note": self.manual_review_note,
"manual_review_at": self.manual_review_at.isoformat() if self.manual_review_at else None,
"appeal_status": self.appeal_status,
"appeal_reason_type": self.appeal_reason_type,
"appeal_reason": self.appeal_reason,
"appeal_evidence_urls": self.appeal_evidence_urls or [],
"appeal_admin_note": self.appeal_admin_note,
"appeal_submitted_at": self.appeal_submitted_at.isoformat() if self.appeal_submitted_at else None,
"appeal_processed_at": self.appeal_processed_at.isoformat() if self.appeal_processed_at else None,
"appeal_processed_by": self.appeal_processed_by,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}