Files
c/backend/app/routes/admin_routes.py
刘正航 385ebe25e7 feat: 运营报告生成功能
- 后端新增 /admin/stats/report 接口,生成14天运营数据报告
- 报告内容:垃圾信息变化趋势、高频风险词Top10、误判率趋势
- 前端运营看板增加"生成报告"按钮,展示完整报告
- 支持复制报告文本到剪贴板

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:07:07 +08:00

620 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import Counter
from datetime import datetime, timedelta
from flask import Blueprint, current_app, request
from sqlalchemy import func, or_
from app.extensions import db
from app.ml.naive_bayes_classifier import NaiveBayesSpamClassifier
from app.models import ContentPost, DetectionConfig, SpamPredictionLog, SpamTrainingSample, User
from app.utils.auth import admin_required, current_user
from app.utils.response import fail, ok
admin_bp = Blueprint("admin", __name__)
def _day_key(day_value) -> str:
if hasattr(day_value, "isoformat"):
return day_value.isoformat()
return str(day_value)
def _tokenize(text: str) -> list[str]:
content = (text or "").strip()
if len(content) <= 1:
return []
tokens = []
for i in range(len(content) - 1):
token = content[i : i + 2]
if token.strip():
tokens.append(token)
return tokens
def _get_or_create_config() -> DetectionConfig:
cfg = DetectionConfig.query.order_by(DetectionConfig.id.asc()).first()
if cfg:
return cfg
cfg = DetectionConfig(spam_threshold=0.75)
db.session.add(cfg)
db.session.commit()
return cfg
def _serialize_post(item: ContentPost) -> dict:
row = item.to_dict()
row["username"] = item.author.username if item.author else ""
row["nickname"] = item.author.nickname if item.author else ""
row["recipient_username"] = item.recipient.username if item.recipient else ""
row["recipient_nickname"] = item.recipient.nickname if item.recipient else ""
row["reviewer_username"] = item.reviewer.username if item.reviewer else ""
return row
def _upsert_manual_sample(text: str, label: str, admin_id: int | None) -> None:
existed = SpamTrainingSample.query.filter_by(text=text, label=label).first()
if existed:
existed.is_active = True
existed.source = existed.source or "manual_review"
return
row = SpamTrainingSample(
text=text,
label=label,
source="manual_review",
created_by=admin_id,
is_active=True,
)
db.session.add(row)
@admin_bp.get("/stats")
@admin_required
def stats():
user_count = User.query.count()
sample_count = SpamTrainingSample.query.count()
predict_count = SpamPredictionLog.query.count()
post_count = ContentPost.query.count()
blocked_count = ContentPost.query.filter_by(status="blocked").count()
published_count = ContentPost.query.filter_by(status="published").count()
pending_appeal_count = ContentPost.query.filter_by(appeal_status="pending").count()
now = datetime.utcnow()
week_ago = now - timedelta(days=6)
trend_rows = (
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
.filter(ContentPost.created_at >= week_ago)
.group_by(func.date(ContentPost.created_at))
.all()
)
blocked_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked").count() or 0
total_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago).count() or 0
day_map = {_day_key(day): int(count or 0) for day, count in trend_rows}
trend = []
today = now.date()
for offset in range(6, -1, -1):
day = today - timedelta(days=offset)
key = day.isoformat()
trend.append({"date": key, "label": day.strftime("%m-%d"), "post_count": day_map.get(key, 0)})
source_rows = (
db.session.query(SpamTrainingSample.source, func.count(SpamTrainingSample.id))
.group_by(SpamTrainingSample.source)
.order_by(func.count(SpamTrainingSample.id).desc())
.all()
)
source_dist = [{"name": (name or "unknown"), "value": int(value or 0)} for name, value in source_rows]
blocked_logs = (
ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
.order_by(ContentPost.id.desc())
.limit(1000)
.all()
)
token_counter = Counter()
for row in blocked_logs:
token_counter.update(_tokenize(row.text))
top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(12)]
cfg = _get_or_create_config()
clf = NaiveBayesSpamClassifier(current_app.config["NB_MODEL_PATH"])
clf.load()
return ok(
{
"user_count": user_count,
"sample_count": sample_count,
"predict_count": predict_count,
"post_count": post_count,
"blocked_count": blocked_count,
"published_count": published_count,
"pending_appeal_count": pending_appeal_count,
"blocked_ratio_7d": round(blocked_7d_count / total_7d_count, 4) if total_7d_count else 0,
"total_7d": total_7d_count,
"trend_7d": trend,
"source_distribution": source_dist,
"top_keywords": top_keywords,
"model_info": clf.model_info(),
"threshold": cfg.to_dict(),
}
)
@admin_bp.get("/stats/report")
@admin_required
def generate_report():
"""生成运营报告:垃圾信息变化、风险词排名、误判率趋势"""
now = datetime.utcnow()
week_ago = now - timedelta(days=13) # 近14天
# 1. 垃圾信息数量变化近14天
blocked_trend_rows = (
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
.group_by(func.date(ContentPost.created_at))
.all()
)
published_trend_rows = (
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
.filter(ContentPost.created_at >= week_ago, ContentPost.status == "published")
.group_by(func.date(ContentPost.created_at))
.all()
)
blocked_map = {_day_key(day): int(count or 0) for day, count in blocked_trend_rows}
published_map = {_day_key(day): int(count or 0) for day, count in published_trend_rows}
spam_trend = []
today = now.date()
for offset in range(13, -1, -1):
day = today - timedelta(days=offset)
key = day.isoformat()
spam_trend.append({
"date": key,
"label": day.strftime("%m-%d"),
"blocked": blocked_map.get(key, 0),
"published": published_map.get(key, 0),
"total": blocked_map.get(key, 0) + published_map.get(key, 0)
})
# 2. 高频风险词排名近14天
blocked_logs = (
ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
.order_by(ContentPost.id.desc())
.limit(1000)
.all()
)
token_counter = Counter()
for row in blocked_logs:
token_counter.update(_tokenize(row.text))
top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(20)]
# 3. 误判率趋势近14天基于人工复核
review_trend_rows = (
db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id))
.filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status != "none")
.group_by(func.date(ContentPost.manual_review_at))
.all()
)
approved_trend_rows = (
db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id))
.filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status == "approved_ham")
.group_by(func.date(ContentPost.manual_review_at))
.all()
)
review_map = {_day_key(day): int(count or 0) for day, count in review_trend_rows}
approved_map = {_day_key(day): int(count or 0) for day, count in approved_trend_rows}
misjudge_trend = []
for offset in range(13, -1, -1):
day = today - timedelta(days=offset)
key = day.isoformat()
reviewed = review_map.get(key, 0)
approved = approved_map.get(key, 0)
misjudge_rate = round(approved / reviewed, 4) if reviewed > 0 else 0
misjudge_trend.append({
"date": key,
"label": day.strftime("%m-%d"),
"reviewed": reviewed,
"approved": approved,
"misjudge_rate": misjudge_rate,
"misjudge_rate_text": f"{misjudge_rate * 100:.1f}%"
})
# 4. 汇总统计
total_blocked_14d = sum(blocked_map.values())
total_published_14d = sum(published_map.values())
total_reviews_14d = sum(review_map.values())
total_approved_14d = sum(approved_map.values())
avg_misjudge_rate = round(total_approved_14d / total_reviews_14d, 4) if total_reviews_14d > 0 else 0
return ok({
"report_date": now.isoformat(),
"period": "近14天",
"spam_trend": spam_trend,
"top_keywords": top_keywords,
"misjudge_trend": misjudge_trend,
"summary": {
"total_blocked": total_blocked_14d,
"total_published": total_published_14d,
"total_posts": total_blocked_14d + total_published_14d,
"blocked_ratio": round(total_blocked_14d / (total_blocked_14d + total_published_14d), 4) if (total_blocked_14d + total_published_14d) > 0 else 0,
"total_reviews": total_reviews_14d,
"total_approved": total_approved_14d,
"avg_misjudge_rate": avg_misjudge_rate,
"avg_misjudge_rate_text": f"{avg_misjudge_rate * 100:.1f}%"
}
})
@admin_bp.get("/detection/threshold")
@admin_required
def get_threshold():
return ok(_get_or_create_config().to_dict())
@admin_bp.put("/detection/threshold")
@admin_required
def set_threshold():
payload = request.get_json(silent=True) or {}
try:
threshold = float(payload.get("spam_threshold"))
except Exception:
return fail("spam_threshold 必须是数字", 400)
if threshold < 0.01 or threshold > 0.99:
return fail("spam_threshold 必须在 0.01 到 0.99 之间", 400)
cfg = _get_or_create_config()
admin = current_user()
cfg.spam_threshold = threshold
cfg.updated_by = admin.id if admin else None
db.session.commit()
return ok(cfg.to_dict(), "阈值更新成功")
@admin_bp.get("/intercepts")
@admin_required
def list_intercepts():
keyword = (request.args.get("keyword") or "").strip()
status = (request.args.get("status") or "blocked").strip().lower()
review_status = (request.args.get("review_status") or "").strip().lower()
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
query = ContentPost.query
if keyword:
query = query.filter(ContentPost.text.like(f"%{keyword}%"))
if status in {"blocked", "published"}:
query = query.filter(ContentPost.status == status)
if review_status in {"none", "pending", "confirmed_spam", "approved_ham"}:
query = query.filter(ContentPost.manual_review_status == review_status)
pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@admin_bp.put("/intercepts/<int:post_id>/review")
@admin_required
def review_intercept(post_id: int):
row = ContentPost.query.get(post_id)
if not row:
return fail("记录不存在", 404)
payload = request.get_json(silent=True) or {}
decision = (payload.get("decision") or "").strip().lower()
note = (payload.get("note") or "").strip()
if decision not in {"spam", "ham"}:
return fail("decision 必须是 spam 或 ham", 400)
admin = current_user()
now = datetime.utcnow()
row.manual_review_by = admin.id if admin else None
row.manual_review_note = note
row.manual_review_at = now
if decision == "spam":
row.status = "blocked"
row.prediction = "spam"
row.manual_review_status = "confirmed_spam"
if row.appeal_status == "pending":
row.appeal_status = "rejected"
row.appeal_admin_note = note or "人工复核确认为垃圾信息"
row.appeal_processed_at = now
row.appeal_processed_by = admin.id if admin else None
_upsert_manual_sample(row.text, "spam", admin.id if admin else None)
else:
row.status = "published"
row.prediction = "ham"
row.manual_review_status = "approved_ham"
if row.appeal_status == "pending":
row.appeal_status = "approved"
row.appeal_admin_note = note or "人工复核后解除拦截"
row.appeal_processed_at = now
row.appeal_processed_by = admin.id if admin else None
_upsert_manual_sample(row.text, "ham", admin.id if admin else None)
db.session.commit()
return ok(_serialize_post(row), "人工复核完成")
@admin_bp.get("/appeals")
@admin_required
def list_appeals():
keyword = (request.args.get("keyword") or "").strip()
status = (request.args.get("status") or "pending").strip().lower()
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
query = ContentPost.query.filter(ContentPost.appeal_status != "none")
if keyword:
query = query.filter(
or_(
ContentPost.text.like(f"%{keyword}%"),
ContentPost.appeal_reason.like(f"%{keyword}%"),
ContentPost.appeal_admin_note.like(f"%{keyword}%"),
)
)
if status in {"pending", "approved", "rejected"}:
query = query.filter(ContentPost.appeal_status == status)
pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
return ok(
{
"items": [_serialize_post(item) for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@admin_bp.put("/appeals/<int:post_id>/process")
@admin_required
def process_appeal(post_id: int):
row = ContentPost.query.get(post_id)
if not row:
return fail("记录不存在", 404)
if row.appeal_status != "pending":
return fail("该申诉不在待处理状态", 400)
payload = request.get_json(silent=True) or {}
action = (payload.get("action") or "").strip().lower()
note = (payload.get("note") or "").strip()
if action not in {"approve", "reject"}:
return fail("action 必须是 approve 或 reject", 400)
admin = current_user()
now = datetime.utcnow()
row.appeal_status = "approved" if action == "approve" else "rejected"
row.appeal_admin_note = note
row.appeal_processed_at = now
row.appeal_processed_by = admin.id if admin else None
row.manual_review_by = admin.id if admin else None
row.manual_review_note = note
row.manual_review_at = now
if action == "approve":
row.status = "published"
row.prediction = "ham"
row.manual_review_status = "approved_ham"
_upsert_manual_sample(row.text, "ham", admin.id if admin else None)
# 申诉通过,增加用户信誉分
if row.author:
row.author.credit_score = min(200, (row.author.credit_score or 100) + 10)
else:
row.status = "blocked"
row.prediction = "spam"
row.manual_review_status = "confirmed_spam"
_upsert_manual_sample(row.text, "spam", admin.id if admin else None)
# 申诉驳回,减少用户信誉分
if row.author:
row.author.credit_score = max(0, (row.author.credit_score or 100) - 5)
db.session.commit()
return ok(_serialize_post(row), "申诉处理完成")
@admin_bp.get("/users")
@admin_required
def list_users():
keyword = (request.args.get("keyword") or "").strip()
page = max(int(request.args.get("page", 1) or 1), 1)
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
query = User.query
if keyword:
query = query.filter(User.username.like(f"%{keyword}%") | User.nickname.like(f"%{keyword}%"))
pagination = query.order_by(User.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
return ok(
{
"items": [item.to_dict() for item in pagination.items],
"total": pagination.total,
"page": page,
"page_size": page_size,
}
)
@admin_bp.post("/users/import")
@admin_required
def import_users():
payload = request.get_json(silent=True) or {}
items = payload.get("items") or []
if not isinstance(items, list) or not items:
return fail("items 必须是非空数组", 400)
created = 0
updated = 0
for row in items:
username = (row.get("username") or "").strip()
if len(username) < 3:
continue
user = User.query.filter_by(username=username).first()
if not user:
user = User(
username=username,
nickname=(row.get("nickname") or username).strip(),
company=(row.get("company") or "").strip(),
title=(row.get("title") or "").strip(),
phone=(row.get("phone") or "").strip(),
is_admin=bool(row.get("is_admin", False)),
)
user.set_password(row.get("password") or "123456")
db.session.add(user)
created += 1
continue
user.nickname = (row.get("nickname") or user.nickname).strip()
user.company = (row.get("company") or user.company).strip()
user.title = (row.get("title") or user.title).strip()
user.phone = (row.get("phone") or user.phone).strip()
if "is_admin" in row:
user.is_admin = bool(row.get("is_admin"))
if row.get("password"):
user.set_password(row["password"])
updated += 1
db.session.commit()
return ok({"created": created, "updated": updated}, "用户导入完成")
@admin_bp.put("/users/<int:user_id>")
@admin_required
def update_user(user_id: int):
user = User.query.get(user_id)
if not user:
return fail("用户不存在", 404)
payload = request.get_json(silent=True) or {}
if "nickname" in payload:
user.nickname = (payload.get("nickname") or user.nickname).strip()
if "company" in payload:
user.company = (payload.get("company") or "").strip()
if "title" in payload:
user.title = (payload.get("title") or "").strip()
if "phone" in payload:
user.phone = (payload.get("phone") or "").strip()
if "is_admin" in payload:
user.is_admin = bool(payload.get("is_admin"))
if "credit_score" in payload:
try:
credit = int(payload.get("credit_score", 100))
user.credit_score = max(0, min(200, credit))
except Exception:
pass
if payload.get("password"):
if len(payload["password"]) < 6:
return fail("密码至少6位", 400)
user.set_password(payload["password"])
db.session.commit()
return ok(user.to_dict(), "用户更新成功")
@admin_bp.delete("/users/<int:user_id>")
@admin_required
def delete_user(user_id: int):
user = User.query.get(user_id)
if not user:
return fail("用户不存在", 404)
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
return fail("至少保留一个管理员账号", 400)
db.session.delete(user)
db.session.commit()
return ok({}, "用户已删除")
@admin_bp.put("/users/<int:user_id>/credit")
@admin_required
def update_user_credit(user_id: int):
"""手动调整用户信誉分"""
user = User.query.get(user_id)
if not user:
return fail("用户不存在", 404)
payload = request.get_json(silent=True) or {}
try:
credit = int(payload.get("credit_score", user.credit_score or 100))
credit = max(0, min(200, credit))
except Exception:
return fail("信誉分必须是0-200之间的整数", 400)
user.credit_score = credit
db.session.commit()
return ok(user.to_dict(), "信誉分已更新")
@admin_bp.post("/users/recalculate-credit")
@admin_required
def recalculate_all_credit():
"""根据用户发布历史和申诉通过率重新计算信誉分"""
users = User.query.filter_by(is_admin=False).all()
updated_count = 0
for user in users:
posts = ContentPost.query.filter_by(user_id=user.id).all()
if not posts:
continue
# 计算发布成功率
published_count = sum(1 for p in posts if p.status == "published")
blocked_count = sum(1 for p in posts if p.status == "blocked")
total_count = len(posts)
if total_count == 0:
continue
publish_ratio = published_count / total_count
# 计算申诉通过率
appeals = [p for p in posts if p.appeal_status != "none"]
approved_appeals = sum(1 for p in appeals if p.appeal_status == "approved")
appeal_ratio = approved_appeals / len(appeals) if appeals else 0
# 基础信誉分:发布成功率贡献
base_score = 100
if publish_ratio >= 0.9:
base_score += 30 # 90%以上发布成功,+30
elif publish_ratio >= 0.7:
base_score += 15 # 70%以上,+15
elif publish_ratio < 0.5:
base_score -= 20 # 低于50%-20
# 申诉通过率贡献
if appeal_ratio >= 0.8 and len(appeals) >= 3:
base_score += 20 # 80%以上申诉通过且有3次以上申诉+20
elif appeal_ratio >= 0.5 and len(appeals) >= 2:
base_score += 10
# 限制范围
user.credit_score = max(0, min(200, base_score))
updated_count += 1
db.session.commit()
return ok({"updated_count": updated_count}, "信誉分批量重算完成")