from collections import Counter from datetime import datetime, timedelta from flask import Blueprint, current_app, request from sqlalchemy import func, or_ from app.extensions import db from app.ml.naive_bayes_classifier import NaiveBayesSpamClassifier from app.models import ContentPost, DetectionConfig, SpamPredictionLog, SpamTrainingSample, User from app.utils.auth import admin_required, current_user from app.utils.response import fail, ok admin_bp = Blueprint("admin", __name__) def _day_key(day_value) -> str: if hasattr(day_value, "isoformat"): return day_value.isoformat() return str(day_value) def _tokenize(text: str) -> list[str]: content = (text or "").strip() if len(content) <= 1: return [] tokens = [] for i in range(len(content) - 1): token = content[i : i + 2] if token.strip(): tokens.append(token) return tokens def _get_or_create_config() -> DetectionConfig: cfg = DetectionConfig.query.order_by(DetectionConfig.id.asc()).first() if cfg: return cfg cfg = DetectionConfig(spam_threshold=0.75) db.session.add(cfg) db.session.commit() return cfg def _serialize_post(item: ContentPost) -> dict: row = item.to_dict() row["username"] = item.author.username if item.author else "" row["nickname"] = item.author.nickname if item.author else "" row["recipient_username"] = item.recipient.username if item.recipient else "" row["recipient_nickname"] = item.recipient.nickname if item.recipient else "" row["reviewer_username"] = item.reviewer.username if item.reviewer else "" return row def _upsert_manual_sample(text: str, label: str, admin_id: int | None) -> None: existed = SpamTrainingSample.query.filter_by(text=text, label=label).first() if existed: existed.is_active = True existed.source = existed.source or "manual_review" return row = SpamTrainingSample( text=text, label=label, source="manual_review", created_by=admin_id, is_active=True, ) db.session.add(row) @admin_bp.get("/stats") @admin_required def stats(): user_count = User.query.count() sample_count = SpamTrainingSample.query.count() predict_count = SpamPredictionLog.query.count() post_count = ContentPost.query.count() blocked_count = ContentPost.query.filter_by(status="blocked").count() published_count = ContentPost.query.filter_by(status="published").count() pending_appeal_count = ContentPost.query.filter_by(appeal_status="pending").count() now = datetime.utcnow() week_ago = now - timedelta(days=6) trend_rows = ( db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id)) .filter(ContentPost.created_at >= week_ago) .group_by(func.date(ContentPost.created_at)) .all() ) blocked_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked").count() or 0 total_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago).count() or 0 day_map = {_day_key(day): int(count or 0) for day, count in trend_rows} trend = [] today = now.date() for offset in range(6, -1, -1): day = today - timedelta(days=offset) key = day.isoformat() trend.append({"date": key, "label": day.strftime("%m-%d"), "post_count": day_map.get(key, 0)}) source_rows = ( db.session.query(SpamTrainingSample.source, func.count(SpamTrainingSample.id)) .group_by(SpamTrainingSample.source) .order_by(func.count(SpamTrainingSample.id).desc()) .all() ) source_dist = [{"name": (name or "unknown"), "value": int(value or 0)} for name, value in source_rows] blocked_logs = ( ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked") .order_by(ContentPost.id.desc()) .limit(1000) .all() ) token_counter = Counter() for row in blocked_logs: token_counter.update(_tokenize(row.text)) top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(12)] cfg = _get_or_create_config() clf = NaiveBayesSpamClassifier(current_app.config["NB_MODEL_PATH"]) clf.load() return ok( { "user_count": user_count, "sample_count": sample_count, "predict_count": predict_count, "post_count": post_count, "blocked_count": blocked_count, "published_count": published_count, "pending_appeal_count": pending_appeal_count, "blocked_ratio_7d": round(blocked_7d_count / total_7d_count, 4) if total_7d_count else 0, "total_7d": total_7d_count, "trend_7d": trend, "source_distribution": source_dist, "top_keywords": top_keywords, "model_info": clf.model_info(), "threshold": cfg.to_dict(), } ) @admin_bp.get("/stats/report") @admin_required def generate_report(): """生成运营报告:垃圾信息变化、风险词排名、误判率趋势""" now = datetime.utcnow() week_ago = now - timedelta(days=13) # 近14天 # 1. 垃圾信息数量变化(近14天) blocked_trend_rows = ( db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id)) .filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked") .group_by(func.date(ContentPost.created_at)) .all() ) published_trend_rows = ( db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id)) .filter(ContentPost.created_at >= week_ago, ContentPost.status == "published") .group_by(func.date(ContentPost.created_at)) .all() ) blocked_map = {_day_key(day): int(count or 0) for day, count in blocked_trend_rows} published_map = {_day_key(day): int(count or 0) for day, count in published_trend_rows} spam_trend = [] today = now.date() for offset in range(13, -1, -1): day = today - timedelta(days=offset) key = day.isoformat() spam_trend.append({ "date": key, "label": day.strftime("%m-%d"), "blocked": blocked_map.get(key, 0), "published": published_map.get(key, 0), "total": blocked_map.get(key, 0) + published_map.get(key, 0) }) # 2. 高频风险词排名(近14天) blocked_logs = ( ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked") .order_by(ContentPost.id.desc()) .limit(1000) .all() ) token_counter = Counter() for row in blocked_logs: token_counter.update(_tokenize(row.text)) top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(20)] # 3. 误判率趋势(近14天,基于人工复核) review_trend_rows = ( db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id)) .filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status != "none") .group_by(func.date(ContentPost.manual_review_at)) .all() ) approved_trend_rows = ( db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id)) .filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status == "approved_ham") .group_by(func.date(ContentPost.manual_review_at)) .all() ) review_map = {_day_key(day): int(count or 0) for day, count in review_trend_rows} approved_map = {_day_key(day): int(count or 0) for day, count in approved_trend_rows} misjudge_trend = [] for offset in range(13, -1, -1): day = today - timedelta(days=offset) key = day.isoformat() reviewed = review_map.get(key, 0) approved = approved_map.get(key, 0) misjudge_rate = round(approved / reviewed, 4) if reviewed > 0 else 0 misjudge_trend.append({ "date": key, "label": day.strftime("%m-%d"), "reviewed": reviewed, "approved": approved, "misjudge_rate": misjudge_rate, "misjudge_rate_text": f"{misjudge_rate * 100:.1f}%" }) # 4. 汇总统计 total_blocked_14d = sum(blocked_map.values()) total_published_14d = sum(published_map.values()) total_reviews_14d = sum(review_map.values()) total_approved_14d = sum(approved_map.values()) avg_misjudge_rate = round(total_approved_14d / total_reviews_14d, 4) if total_reviews_14d > 0 else 0 return ok({ "report_date": now.isoformat(), "period": "近14天", "spam_trend": spam_trend, "top_keywords": top_keywords, "misjudge_trend": misjudge_trend, "summary": { "total_blocked": total_blocked_14d, "total_published": total_published_14d, "total_posts": total_blocked_14d + total_published_14d, "blocked_ratio": round(total_blocked_14d / (total_blocked_14d + total_published_14d), 4) if (total_blocked_14d + total_published_14d) > 0 else 0, "total_reviews": total_reviews_14d, "total_approved": total_approved_14d, "avg_misjudge_rate": avg_misjudge_rate, "avg_misjudge_rate_text": f"{avg_misjudge_rate * 100:.1f}%" } }) @admin_bp.get("/detection/threshold") @admin_required def get_threshold(): return ok(_get_or_create_config().to_dict()) @admin_bp.put("/detection/threshold") @admin_required def set_threshold(): payload = request.get_json(silent=True) or {} try: threshold = float(payload.get("spam_threshold")) except Exception: return fail("spam_threshold 必须是数字", 400) if threshold < 0.01 or threshold > 0.99: return fail("spam_threshold 必须在 0.01 到 0.99 之间", 400) cfg = _get_or_create_config() admin = current_user() cfg.spam_threshold = threshold cfg.updated_by = admin.id if admin else None db.session.commit() return ok(cfg.to_dict(), "阈值更新成功") @admin_bp.get("/intercepts") @admin_required def list_intercepts(): keyword = (request.args.get("keyword") or "").strip() status = (request.args.get("status") or "blocked").strip().lower() review_status = (request.args.get("review_status") or "").strip().lower() page = max(int(request.args.get("page", 1) or 1), 1) page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100) query = ContentPost.query if keyword: query = query.filter(ContentPost.text.like(f"%{keyword}%")) if status in {"blocked", "published"}: query = query.filter(ContentPost.status == status) if review_status in {"none", "pending", "confirmed_spam", "approved_ham"}: query = query.filter(ContentPost.manual_review_status == review_status) pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False) return ok( { "items": [_serialize_post(item) for item in pagination.items], "total": pagination.total, "page": page, "page_size": page_size, } ) @admin_bp.put("/intercepts//review") @admin_required def review_intercept(post_id: int): row = ContentPost.query.get(post_id) if not row: return fail("记录不存在", 404) payload = request.get_json(silent=True) or {} decision = (payload.get("decision") or "").strip().lower() note = (payload.get("note") or "").strip() if decision not in {"spam", "ham"}: return fail("decision 必须是 spam 或 ham", 400) admin = current_user() now = datetime.utcnow() row.manual_review_by = admin.id if admin else None row.manual_review_note = note row.manual_review_at = now if decision == "spam": row.status = "blocked" row.prediction = "spam" row.manual_review_status = "confirmed_spam" if row.appeal_status == "pending": row.appeal_status = "rejected" row.appeal_admin_note = note or "人工复核确认为垃圾信息" row.appeal_processed_at = now row.appeal_processed_by = admin.id if admin else None _upsert_manual_sample(row.text, "spam", admin.id if admin else None) else: row.status = "published" row.prediction = "ham" row.manual_review_status = "approved_ham" if row.appeal_status == "pending": row.appeal_status = "approved" row.appeal_admin_note = note or "人工复核后解除拦截" row.appeal_processed_at = now row.appeal_processed_by = admin.id if admin else None _upsert_manual_sample(row.text, "ham", admin.id if admin else None) db.session.commit() return ok(_serialize_post(row), "人工复核完成") @admin_bp.get("/appeals") @admin_required def list_appeals(): keyword = (request.args.get("keyword") or "").strip() status = (request.args.get("status") or "pending").strip().lower() page = max(int(request.args.get("page", 1) or 1), 1) page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100) query = ContentPost.query.filter(ContentPost.appeal_status != "none") if keyword: query = query.filter( or_( ContentPost.text.like(f"%{keyword}%"), ContentPost.appeal_reason.like(f"%{keyword}%"), ContentPost.appeal_admin_note.like(f"%{keyword}%"), ) ) if status in {"pending", "approved", "rejected"}: query = query.filter(ContentPost.appeal_status == status) pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False) return ok( { "items": [_serialize_post(item) for item in pagination.items], "total": pagination.total, "page": page, "page_size": page_size, } ) @admin_bp.put("/appeals//process") @admin_required def process_appeal(post_id: int): row = ContentPost.query.get(post_id) if not row: return fail("记录不存在", 404) if row.appeal_status != "pending": return fail("该申诉不在待处理状态", 400) payload = request.get_json(silent=True) or {} action = (payload.get("action") or "").strip().lower() note = (payload.get("note") or "").strip() if action not in {"approve", "reject"}: return fail("action 必须是 approve 或 reject", 400) admin = current_user() now = datetime.utcnow() row.appeal_status = "approved" if action == "approve" else "rejected" row.appeal_admin_note = note row.appeal_processed_at = now row.appeal_processed_by = admin.id if admin else None row.manual_review_by = admin.id if admin else None row.manual_review_note = note row.manual_review_at = now if action == "approve": row.status = "published" row.prediction = "ham" row.manual_review_status = "approved_ham" _upsert_manual_sample(row.text, "ham", admin.id if admin else None) # 申诉通过,增加用户信誉分 if row.author: row.author.credit_score = min(200, (row.author.credit_score or 100) + 10) else: row.status = "blocked" row.prediction = "spam" row.manual_review_status = "confirmed_spam" _upsert_manual_sample(row.text, "spam", admin.id if admin else None) # 申诉驳回,减少用户信誉分 if row.author: row.author.credit_score = max(0, (row.author.credit_score or 100) - 5) db.session.commit() return ok(_serialize_post(row), "申诉处理完成") @admin_bp.get("/users") @admin_required def list_users(): keyword = (request.args.get("keyword") or "").strip() page = max(int(request.args.get("page", 1) or 1), 1) page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100) query = User.query if keyword: query = query.filter(User.username.like(f"%{keyword}%") | User.nickname.like(f"%{keyword}%")) pagination = query.order_by(User.id.desc()).paginate(page=page, per_page=page_size, error_out=False) return ok( { "items": [item.to_dict() for item in pagination.items], "total": pagination.total, "page": page, "page_size": page_size, } ) @admin_bp.post("/users/import") @admin_required def import_users(): payload = request.get_json(silent=True) or {} items = payload.get("items") or [] if not isinstance(items, list) or not items: return fail("items 必须是非空数组", 400) created = 0 updated = 0 for row in items: username = (row.get("username") or "").strip() if len(username) < 3: continue user = User.query.filter_by(username=username).first() if not user: user = User( username=username, nickname=(row.get("nickname") or username).strip(), company=(row.get("company") or "").strip(), title=(row.get("title") or "").strip(), phone=(row.get("phone") or "").strip(), is_admin=bool(row.get("is_admin", False)), ) user.set_password(row.get("password") or "123456") db.session.add(user) created += 1 continue user.nickname = (row.get("nickname") or user.nickname).strip() user.company = (row.get("company") or user.company).strip() user.title = (row.get("title") or user.title).strip() user.phone = (row.get("phone") or user.phone).strip() if "is_admin" in row: user.is_admin = bool(row.get("is_admin")) if row.get("password"): user.set_password(row["password"]) updated += 1 db.session.commit() return ok({"created": created, "updated": updated}, "用户导入完成") @admin_bp.put("/users/") @admin_required def update_user(user_id: int): user = User.query.get(user_id) if not user: return fail("用户不存在", 404) payload = request.get_json(silent=True) or {} if "nickname" in payload: user.nickname = (payload.get("nickname") or user.nickname).strip() if "company" in payload: user.company = (payload.get("company") or "").strip() if "title" in payload: user.title = (payload.get("title") or "").strip() if "phone" in payload: user.phone = (payload.get("phone") or "").strip() if "is_admin" in payload: user.is_admin = bool(payload.get("is_admin")) if "credit_score" in payload: try: credit = int(payload.get("credit_score", 100)) user.credit_score = max(0, min(200, credit)) except Exception: pass if payload.get("password"): if len(payload["password"]) < 6: return fail("密码至少6位", 400) user.set_password(payload["password"]) db.session.commit() return ok(user.to_dict(), "用户更新成功") @admin_bp.delete("/users/") @admin_required def delete_user(user_id: int): user = User.query.get(user_id) if not user: return fail("用户不存在", 404) if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1: return fail("至少保留一个管理员账号", 400) db.session.delete(user) db.session.commit() return ok({}, "用户已删除") @admin_bp.put("/users//credit") @admin_required def update_user_credit(user_id: int): """手动调整用户信誉分""" user = User.query.get(user_id) if not user: return fail("用户不存在", 404) payload = request.get_json(silent=True) or {} try: credit = int(payload.get("credit_score", user.credit_score or 100)) credit = max(0, min(200, credit)) except Exception: return fail("信誉分必须是0-200之间的整数", 400) user.credit_score = credit db.session.commit() return ok(user.to_dict(), "信誉分已更新") @admin_bp.post("/users/recalculate-credit") @admin_required def recalculate_all_credit(): """根据用户发布历史和申诉通过率重新计算信誉分""" users = User.query.filter_by(is_admin=False).all() updated_count = 0 for user in users: posts = ContentPost.query.filter_by(user_id=user.id).all() if not posts: continue # 计算发布成功率 published_count = sum(1 for p in posts if p.status == "published") blocked_count = sum(1 for p in posts if p.status == "blocked") total_count = len(posts) if total_count == 0: continue publish_ratio = published_count / total_count # 计算申诉通过率 appeals = [p for p in posts if p.appeal_status != "none"] approved_appeals = sum(1 for p in appeals if p.appeal_status == "approved") appeal_ratio = approved_appeals / len(appeals) if appeals else 0 # 基础信誉分:发布成功率贡献 base_score = 100 if publish_ratio >= 0.9: base_score += 30 # 90%以上发布成功,+30 elif publish_ratio >= 0.7: base_score += 15 # 70%以上,+15 elif publish_ratio < 0.5: base_score -= 20 # 低于50%,-20 # 申诉通过率贡献 if appeal_ratio >= 0.8 and len(appeals) >= 3: base_score += 20 # 80%以上申诉通过且有3次以上申诉,+20 elif appeal_ratio >= 0.5 and len(appeals) >= 2: base_score += 10 # 限制范围 user.credit_score = max(0, min(200, base_score)) updated_count += 1 db.session.commit() return ok({"updated_count": updated_count}, "信誉分批量重算完成")