Files
c/backend/app/routes/content_routes.py
刘正航 f7d0601c4e feat: 申诉常见理由快捷选择 + 证据截图上传
- 后端: 新增 appeal_reason_type, appeal_evidence_urls 字段
- 后端: 新建 upload_routes.py 图片上传接口
- 前端: history 页面添加快捷理由选择器 + 截图上传
- 前端: admin-review 页面展示证据图片 + 点击预览
- 新增 SQL 更新脚本 update_appeal_fields.sql

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-21 23:26:25 +08:00

367 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from flask import Blueprint, current_app, request
from flask_jwt_extended import jwt_required
from app.extensions import db
from app.ml.naive_bayes_classifier import NaiveBayesSpamClassifier
from app.models import ContentPost, DetectionConfig, SpamPredictionLog, SpamTrainingSample, User
from app.utils.auth import current_user
from app.utils.response import fail, ok
content_bp = Blueprint("content", __name__)
def _classifier() -> NaiveBayesSpamClassifier:
return NaiveBayesSpamClassifier(current_app.config["NB_MODEL_PATH"])
def _active_samples() -> list[dict]:
rows = SpamTrainingSample.query.filter_by(is_active=True).order_by(SpamTrainingSample.id.asc()).all()
return [{"text": row.text, "label": row.label} for row in rows]
def _ensure_ready() -> NaiveBayesSpamClassifier:
clf = _classifier()
clf.ensure_ready(_active_samples())
return clf
def _get_config() -> DetectionConfig:
cfg = DetectionConfig.query.order_by(DetectionConfig.id.asc()).first()
if cfg:
return cfg
cfg = DetectionConfig(spam_threshold=0.75)
db.session.add(cfg)
db.session.commit()
return cfg
def _serialize_post(row: ContentPost) -> dict:
payload = row.to_dict()
payload["username"] = row.author.username if row.author else ""
payload["nickname"] = row.author.nickname if row.author else ""
payload["recipient_username"] = row.recipient.username if row.recipient else ""
payload["recipient_nickname"] = row.recipient.nickname if row.recipient else ""
payload["reviewer_username"] = row.reviewer.username if row.reviewer else ""
return payload
def _resolve_visibility(value: str) -> str:
key = (value or "public").strip().lower()
return key if key in {"public", "private", "direct"} else "public"
def _resolve_recipient(payload: dict, visibility: str, current_user_id: int):
if visibility != "direct":
return None, None
recipient = None
raw_id = payload.get("recipient_user_id")
username = (payload.get("recipient_username") or "").strip()
if raw_id is not None and str(raw_id).strip() != "":
try:
recipient = User.query.get(int(raw_id))
except Exception:
return None, "recipient_user_id 无效"
elif username:
recipient = User.query.filter_by(username=username).first()
if not recipient:
return None, "私信发布必须指定有效接收人"
if recipient.id == current_user_id:
return None, "不能给自己发送私信"
return recipient, None
def _predict_and_decide(text: str) -> tuple[dict, float, bool]:
clf = _ensure_ready()
result = clf.predict(text)
threshold = float(_get_config().spam_threshold)
blocked = float(result["spam_probability"]) >= threshold
return result, threshold, blocked
@content_bp.post("/publish")
@jwt_required()
def publish_text():
user = current_user()
if not user:
return fail("用户不存在", 404)
payload = request.get_json(silent=True) or {}
text = (payload.get("text") or "").strip()
visibility = _resolve_visibility(payload.get("visibility"))
if len(text) < 2:
return fail("发布文本至少2个字符", 400)
recipient, err = _resolve_recipient(payload, visibility, user.id)
if err:
return fail(err, 400)
result, threshold, blocked = _predict_and_decide(text)
post = ContentPost(
user_id=user.id,
recipient_user_id=recipient.id if recipient else None,
text=result["text"],
visibility=visibility,
status="blocked" if blocked else "published",
prediction=result["prediction"],
spam_probability=result["spam_probability"],
ham_probability=result["ham_probability"],
confidence=result["confidence"],
threshold=threshold,
reason_tokens=result["reason_tokens"],
model_version=result.get("model_version", ""),
manual_review_status="pending" if blocked else "none",
)
detect_log = SpamPredictionLog(
user_id=user.id,
text=result["text"],
prediction=result["prediction"],
spam_probability=result["spam_probability"],
ham_probability=result["ham_probability"],
confidence=result["confidence"],
reason_tokens=result["reason_tokens"],
model_version=result.get("model_version", ""),
)
db.session.add(post)
db.session.add(detect_log)
db.session.commit()
feedback = "发布成功" if not blocked else "疑似垃圾信息,系统已拦截,可提交申诉"
return ok(
{
"publish_allowed": not blocked,
"action": "published" if not blocked else "blocked",
"feedback": feedback,
"post": _serialize_post(post),
"detect": result,
},
feedback,
)
@content_bp.put("/posts/<int:post_id>")
@jwt_required()
def edit_post(post_id: int):
user = current_user()
if not user:
return fail("用户不存在", 404)
post = ContentPost.query.filter_by(id=post_id, user_id=user.id).first()
if not post:
return fail("发布记录不存在", 404)
payload = request.get_json(silent=True) or {}
text = (payload.get("text") or post.text).strip()
visibility = _resolve_visibility(payload.get("visibility") or post.visibility)
if len(text) < 2:
return fail("发布文本至少2个字符", 400)
recipient, err = _resolve_recipient(payload, visibility, user.id)
if err:
return fail(err, 400)
result, threshold, blocked = _predict_and_decide(text)
post.text = result["text"]
post.visibility = visibility
post.recipient_user_id = recipient.id if recipient else None
post.status = "blocked" if blocked else "published"
post.prediction = result["prediction"]
post.spam_probability = result["spam_probability"]
post.ham_probability = result["ham_probability"]
post.confidence = result["confidence"]
post.threshold = threshold
post.reason_tokens = result["reason_tokens"]
post.model_version = result.get("model_version", "")
post.manual_review_status = "pending" if blocked else "none"
post.manual_review_by = None
post.manual_review_note = ""
post.manual_review_at = None
post.appeal_status = "none"
post.appeal_reason = ""
post.appeal_admin_note = ""
post.appeal_submitted_at = None
post.appeal_processed_at = None
post.appeal_processed_by = None
db.session.commit()
feedback = "更新并重新发布成功" if not blocked else "更新后触发拦截,可提交申诉"
return ok(
{
"publish_allowed": not blocked,
"action": "published" if not blocked else "blocked",
"feedback": feedback,
"post": _serialize_post(post),
"detect": result,
},
feedback,
)
@content_bp.get("/posts/history")
@jwt_required()
def my_posts():
user = current_user()
if not user:
return fail("用户不存在", 404)
status = (request.args.get("status") or "").strip().lower()
visibility = (request.args.get("visibility") or "").strip().lower()
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
query = ContentPost.query.filter_by(user_id=user.id)
if status in {"published", "blocked"}:
query = query.filter(ContentPost.status == status)
if visibility in {"public", "private", "direct"}:
query = query.filter(ContentPost.visibility == visibility)
pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@content_bp.get("/posts/inbox")
@jwt_required()
def my_inbox():
user = current_user()
if not user:
return fail("用户不存在", 404)
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
pagination = (
ContentPost.query.filter_by(recipient_user_id=user.id, visibility="direct", status="published")
.order_by(ContentPost.id.desc())
.paginate(page=page, per_page=page_size, error_out=False)
)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@content_bp.delete("/posts/<int:post_id>")
@jwt_required()
def delete_post(post_id: int):
user = current_user()
if not user:
return fail("用户不存在", 404)
row = ContentPost.query.filter_by(id=post_id, user_id=user.id).first()
if not row:
return fail("记录不存在", 404)
db.session.delete(row)
db.session.commit()
return ok({}, "记录已删除")
@content_bp.post("/posts/<int:post_id>/appeal")
@jwt_required()
def submit_appeal(post_id: int):
user = current_user()
if not user:
return fail("用户不存在", 404)
post = ContentPost.query.filter_by(id=post_id, user_id=user.id).first()
if not post:
return fail("发布记录不存在", 404)
if post.status != "blocked":
return fail("仅被拦截的信息可申诉", 400)
payload = request.get_json(silent=True) or {}
reason_type = (payload.get("reason_type") or "").strip()
reason = (payload.get("reason") or "").strip()
evidence_urls = payload.get("evidence_urls") or []
# 如果选择了快捷理由reason 可为空;否则至少 2 字符
if not reason_type and len(reason) < 2:
return fail("申诉理由至少2个字符", 400)
if post.appeal_status == "pending":
return fail("该记录已在申诉处理中", 400)
post.appeal_status = "pending"
post.appeal_reason_type = reason_type
post.appeal_reason = reason
post.appeal_evidence_urls = evidence_urls
post.appeal_submitted_at = datetime.utcnow()
post.appeal_admin_note = ""
post.appeal_processed_at = None
post.appeal_processed_by = None
post.manual_review_status = "pending"
db.session.commit()
return ok(_serialize_post(post), "申诉提交成功")
@content_bp.get("/appeals/my")
@jwt_required()
def my_appeals():
user = current_user()
if not user:
return fail("用户不存在", 404)
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
pagination = (
ContentPost.query.filter(ContentPost.user_id == user.id, ContentPost.appeal_status != "none")
.order_by(ContentPost.id.desc())
.paginate(page=page, per_page=page_size, error_out=False)
)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@content_bp.get("/posts/public")
@jwt_required(optional=True)
def public_feed():
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
pagination = (
ContentPost.query.filter_by(visibility="public", status="published")
.order_by(ContentPost.id.desc())
.paginate(page=page, per_page=page_size, error_out=False)
)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)