- 后端新增 /admin/stats/report 接口,生成14天运营数据报告 - 报告内容:垃圾信息变化趋势、高频风险词Top10、误判率趋势 - 前端运营看板增加"生成报告"按钮,展示完整报告 - 支持复制报告文本到剪贴板 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
620 lines
22 KiB
Python
620 lines
22 KiB
Python
from collections import Counter
|
||
from datetime import datetime, timedelta
|
||
|
||
from flask import Blueprint, current_app, request
|
||
from sqlalchemy import func, or_
|
||
|
||
from app.extensions import db
|
||
from app.ml.naive_bayes_classifier import NaiveBayesSpamClassifier
|
||
from app.models import ContentPost, DetectionConfig, SpamPredictionLog, SpamTrainingSample, User
|
||
from app.utils.auth import admin_required, current_user
|
||
from app.utils.response import fail, ok
|
||
|
||
|
||
admin_bp = Blueprint("admin", __name__)
|
||
|
||
|
||
def _day_key(day_value) -> str:
|
||
if hasattr(day_value, "isoformat"):
|
||
return day_value.isoformat()
|
||
return str(day_value)
|
||
|
||
|
||
def _tokenize(text: str) -> list[str]:
|
||
content = (text or "").strip()
|
||
if len(content) <= 1:
|
||
return []
|
||
tokens = []
|
||
for i in range(len(content) - 1):
|
||
token = content[i : i + 2]
|
||
if token.strip():
|
||
tokens.append(token)
|
||
return tokens
|
||
|
||
|
||
def _get_or_create_config() -> DetectionConfig:
|
||
cfg = DetectionConfig.query.order_by(DetectionConfig.id.asc()).first()
|
||
if cfg:
|
||
return cfg
|
||
|
||
cfg = DetectionConfig(spam_threshold=0.75)
|
||
db.session.add(cfg)
|
||
db.session.commit()
|
||
return cfg
|
||
|
||
|
||
def _serialize_post(item: ContentPost) -> dict:
|
||
row = item.to_dict()
|
||
row["username"] = item.author.username if item.author else ""
|
||
row["nickname"] = item.author.nickname if item.author else ""
|
||
row["recipient_username"] = item.recipient.username if item.recipient else ""
|
||
row["recipient_nickname"] = item.recipient.nickname if item.recipient else ""
|
||
row["reviewer_username"] = item.reviewer.username if item.reviewer else ""
|
||
return row
|
||
|
||
|
||
def _upsert_manual_sample(text: str, label: str, admin_id: int | None) -> None:
|
||
existed = SpamTrainingSample.query.filter_by(text=text, label=label).first()
|
||
if existed:
|
||
existed.is_active = True
|
||
existed.source = existed.source or "manual_review"
|
||
return
|
||
|
||
row = SpamTrainingSample(
|
||
text=text,
|
||
label=label,
|
||
source="manual_review",
|
||
created_by=admin_id,
|
||
is_active=True,
|
||
)
|
||
db.session.add(row)
|
||
|
||
|
||
@admin_bp.get("/stats")
|
||
@admin_required
|
||
def stats():
|
||
user_count = User.query.count()
|
||
sample_count = SpamTrainingSample.query.count()
|
||
predict_count = SpamPredictionLog.query.count()
|
||
post_count = ContentPost.query.count()
|
||
blocked_count = ContentPost.query.filter_by(status="blocked").count()
|
||
published_count = ContentPost.query.filter_by(status="published").count()
|
||
pending_appeal_count = ContentPost.query.filter_by(appeal_status="pending").count()
|
||
|
||
now = datetime.utcnow()
|
||
week_ago = now - timedelta(days=6)
|
||
|
||
trend_rows = (
|
||
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
|
||
.filter(ContentPost.created_at >= week_ago)
|
||
.group_by(func.date(ContentPost.created_at))
|
||
.all()
|
||
)
|
||
|
||
blocked_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked").count() or 0
|
||
total_7d_count = ContentPost.query.filter(ContentPost.created_at >= week_ago).count() or 0
|
||
|
||
day_map = {_day_key(day): int(count or 0) for day, count in trend_rows}
|
||
trend = []
|
||
today = now.date()
|
||
for offset in range(6, -1, -1):
|
||
day = today - timedelta(days=offset)
|
||
key = day.isoformat()
|
||
trend.append({"date": key, "label": day.strftime("%m-%d"), "post_count": day_map.get(key, 0)})
|
||
|
||
source_rows = (
|
||
db.session.query(SpamTrainingSample.source, func.count(SpamTrainingSample.id))
|
||
.group_by(SpamTrainingSample.source)
|
||
.order_by(func.count(SpamTrainingSample.id).desc())
|
||
.all()
|
||
)
|
||
source_dist = [{"name": (name or "unknown"), "value": int(value or 0)} for name, value in source_rows]
|
||
|
||
blocked_logs = (
|
||
ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
|
||
.order_by(ContentPost.id.desc())
|
||
.limit(1000)
|
||
.all()
|
||
)
|
||
|
||
token_counter = Counter()
|
||
for row in blocked_logs:
|
||
token_counter.update(_tokenize(row.text))
|
||
top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(12)]
|
||
|
||
cfg = _get_or_create_config()
|
||
|
||
clf = NaiveBayesSpamClassifier(current_app.config["NB_MODEL_PATH"])
|
||
clf.load()
|
||
|
||
return ok(
|
||
{
|
||
"user_count": user_count,
|
||
"sample_count": sample_count,
|
||
"predict_count": predict_count,
|
||
"post_count": post_count,
|
||
"blocked_count": blocked_count,
|
||
"published_count": published_count,
|
||
"pending_appeal_count": pending_appeal_count,
|
||
"blocked_ratio_7d": round(blocked_7d_count / total_7d_count, 4) if total_7d_count else 0,
|
||
"total_7d": total_7d_count,
|
||
"trend_7d": trend,
|
||
"source_distribution": source_dist,
|
||
"top_keywords": top_keywords,
|
||
"model_info": clf.model_info(),
|
||
"threshold": cfg.to_dict(),
|
||
}
|
||
)
|
||
|
||
|
||
@admin_bp.get("/stats/report")
|
||
@admin_required
|
||
def generate_report():
|
||
"""生成运营报告:垃圾信息变化、风险词排名、误判率趋势"""
|
||
now = datetime.utcnow()
|
||
week_ago = now - timedelta(days=13) # 近14天
|
||
|
||
# 1. 垃圾信息数量变化(近14天)
|
||
blocked_trend_rows = (
|
||
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
|
||
.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
|
||
.group_by(func.date(ContentPost.created_at))
|
||
.all()
|
||
)
|
||
published_trend_rows = (
|
||
db.session.query(func.date(ContentPost.created_at), func.count(ContentPost.id))
|
||
.filter(ContentPost.created_at >= week_ago, ContentPost.status == "published")
|
||
.group_by(func.date(ContentPost.created_at))
|
||
.all()
|
||
)
|
||
|
||
blocked_map = {_day_key(day): int(count or 0) for day, count in blocked_trend_rows}
|
||
published_map = {_day_key(day): int(count or 0) for day, count in published_trend_rows}
|
||
|
||
spam_trend = []
|
||
today = now.date()
|
||
for offset in range(13, -1, -1):
|
||
day = today - timedelta(days=offset)
|
||
key = day.isoformat()
|
||
spam_trend.append({
|
||
"date": key,
|
||
"label": day.strftime("%m-%d"),
|
||
"blocked": blocked_map.get(key, 0),
|
||
"published": published_map.get(key, 0),
|
||
"total": blocked_map.get(key, 0) + published_map.get(key, 0)
|
||
})
|
||
|
||
# 2. 高频风险词排名(近14天)
|
||
blocked_logs = (
|
||
ContentPost.query.filter(ContentPost.created_at >= week_ago, ContentPost.status == "blocked")
|
||
.order_by(ContentPost.id.desc())
|
||
.limit(1000)
|
||
.all()
|
||
)
|
||
token_counter = Counter()
|
||
for row in blocked_logs:
|
||
token_counter.update(_tokenize(row.text))
|
||
top_keywords = [{"token": token, "count": count} for token, count in token_counter.most_common(20)]
|
||
|
||
# 3. 误判率趋势(近14天,基于人工复核)
|
||
review_trend_rows = (
|
||
db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id))
|
||
.filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status != "none")
|
||
.group_by(func.date(ContentPost.manual_review_at))
|
||
.all()
|
||
)
|
||
approved_trend_rows = (
|
||
db.session.query(func.date(ContentPost.manual_review_at), func.count(ContentPost.id))
|
||
.filter(ContentPost.manual_review_at >= week_ago, ContentPost.manual_review_status == "approved_ham")
|
||
.group_by(func.date(ContentPost.manual_review_at))
|
||
.all()
|
||
)
|
||
|
||
review_map = {_day_key(day): int(count or 0) for day, count in review_trend_rows}
|
||
approved_map = {_day_key(day): int(count or 0) for day, count in approved_trend_rows}
|
||
|
||
misjudge_trend = []
|
||
for offset in range(13, -1, -1):
|
||
day = today - timedelta(days=offset)
|
||
key = day.isoformat()
|
||
reviewed = review_map.get(key, 0)
|
||
approved = approved_map.get(key, 0)
|
||
misjudge_rate = round(approved / reviewed, 4) if reviewed > 0 else 0
|
||
misjudge_trend.append({
|
||
"date": key,
|
||
"label": day.strftime("%m-%d"),
|
||
"reviewed": reviewed,
|
||
"approved": approved,
|
||
"misjudge_rate": misjudge_rate,
|
||
"misjudge_rate_text": f"{misjudge_rate * 100:.1f}%"
|
||
})
|
||
|
||
# 4. 汇总统计
|
||
total_blocked_14d = sum(blocked_map.values())
|
||
total_published_14d = sum(published_map.values())
|
||
total_reviews_14d = sum(review_map.values())
|
||
total_approved_14d = sum(approved_map.values())
|
||
avg_misjudge_rate = round(total_approved_14d / total_reviews_14d, 4) if total_reviews_14d > 0 else 0
|
||
|
||
return ok({
|
||
"report_date": now.isoformat(),
|
||
"period": "近14天",
|
||
"spam_trend": spam_trend,
|
||
"top_keywords": top_keywords,
|
||
"misjudge_trend": misjudge_trend,
|
||
"summary": {
|
||
"total_blocked": total_blocked_14d,
|
||
"total_published": total_published_14d,
|
||
"total_posts": total_blocked_14d + total_published_14d,
|
||
"blocked_ratio": round(total_blocked_14d / (total_blocked_14d + total_published_14d), 4) if (total_blocked_14d + total_published_14d) > 0 else 0,
|
||
"total_reviews": total_reviews_14d,
|
||
"total_approved": total_approved_14d,
|
||
"avg_misjudge_rate": avg_misjudge_rate,
|
||
"avg_misjudge_rate_text": f"{avg_misjudge_rate * 100:.1f}%"
|
||
}
|
||
})
|
||
|
||
|
||
@admin_bp.get("/detection/threshold")
|
||
@admin_required
|
||
def get_threshold():
|
||
return ok(_get_or_create_config().to_dict())
|
||
|
||
|
||
@admin_bp.put("/detection/threshold")
|
||
@admin_required
|
||
def set_threshold():
|
||
payload = request.get_json(silent=True) or {}
|
||
try:
|
||
threshold = float(payload.get("spam_threshold"))
|
||
except Exception:
|
||
return fail("spam_threshold 必须是数字", 400)
|
||
|
||
if threshold < 0.01 or threshold > 0.99:
|
||
return fail("spam_threshold 必须在 0.01 到 0.99 之间", 400)
|
||
|
||
cfg = _get_or_create_config()
|
||
admin = current_user()
|
||
cfg.spam_threshold = threshold
|
||
cfg.updated_by = admin.id if admin else None
|
||
db.session.commit()
|
||
return ok(cfg.to_dict(), "阈值更新成功")
|
||
|
||
|
||
@admin_bp.get("/intercepts")
|
||
@admin_required
|
||
def list_intercepts():
|
||
keyword = (request.args.get("keyword") or "").strip()
|
||
status = (request.args.get("status") or "blocked").strip().lower()
|
||
review_status = (request.args.get("review_status") or "").strip().lower()
|
||
page = max(int(request.args.get("page", 1) or 1), 1)
|
||
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
|
||
|
||
query = ContentPost.query
|
||
if keyword:
|
||
query = query.filter(ContentPost.text.like(f"%{keyword}%"))
|
||
if status in {"blocked", "published"}:
|
||
query = query.filter(ContentPost.status == status)
|
||
if review_status in {"none", "pending", "confirmed_spam", "approved_ham"}:
|
||
query = query.filter(ContentPost.manual_review_status == review_status)
|
||
|
||
pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
|
||
return ok(
|
||
{
|
||
"items": [_serialize_post(item) for item in pagination.items],
|
||
"total": pagination.total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
}
|
||
)
|
||
|
||
|
||
@admin_bp.put("/intercepts/<int:post_id>/review")
|
||
@admin_required
|
||
def review_intercept(post_id: int):
|
||
row = ContentPost.query.get(post_id)
|
||
if not row:
|
||
return fail("记录不存在", 404)
|
||
|
||
payload = request.get_json(silent=True) or {}
|
||
decision = (payload.get("decision") or "").strip().lower()
|
||
note = (payload.get("note") or "").strip()
|
||
if decision not in {"spam", "ham"}:
|
||
return fail("decision 必须是 spam 或 ham", 400)
|
||
|
||
admin = current_user()
|
||
now = datetime.utcnow()
|
||
|
||
row.manual_review_by = admin.id if admin else None
|
||
row.manual_review_note = note
|
||
row.manual_review_at = now
|
||
|
||
if decision == "spam":
|
||
row.status = "blocked"
|
||
row.prediction = "spam"
|
||
row.manual_review_status = "confirmed_spam"
|
||
if row.appeal_status == "pending":
|
||
row.appeal_status = "rejected"
|
||
row.appeal_admin_note = note or "人工复核确认为垃圾信息"
|
||
row.appeal_processed_at = now
|
||
row.appeal_processed_by = admin.id if admin else None
|
||
_upsert_manual_sample(row.text, "spam", admin.id if admin else None)
|
||
else:
|
||
row.status = "published"
|
||
row.prediction = "ham"
|
||
row.manual_review_status = "approved_ham"
|
||
if row.appeal_status == "pending":
|
||
row.appeal_status = "approved"
|
||
row.appeal_admin_note = note or "人工复核后解除拦截"
|
||
row.appeal_processed_at = now
|
||
row.appeal_processed_by = admin.id if admin else None
|
||
_upsert_manual_sample(row.text, "ham", admin.id if admin else None)
|
||
|
||
db.session.commit()
|
||
return ok(_serialize_post(row), "人工复核完成")
|
||
|
||
|
||
@admin_bp.get("/appeals")
|
||
@admin_required
|
||
def list_appeals():
|
||
keyword = (request.args.get("keyword") or "").strip()
|
||
status = (request.args.get("status") or "pending").strip().lower()
|
||
page = max(int(request.args.get("page", 1) or 1), 1)
|
||
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
|
||
|
||
query = ContentPost.query.filter(ContentPost.appeal_status != "none")
|
||
if keyword:
|
||
query = query.filter(
|
||
or_(
|
||
ContentPost.text.like(f"%{keyword}%"),
|
||
ContentPost.appeal_reason.like(f"%{keyword}%"),
|
||
ContentPost.appeal_admin_note.like(f"%{keyword}%"),
|
||
)
|
||
)
|
||
if status in {"pending", "approved", "rejected"}:
|
||
query = query.filter(ContentPost.appeal_status == status)
|
||
|
||
pagination = query.order_by(ContentPost.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
|
||
return ok(
|
||
{
|
||
"items": [_serialize_post(item) for item in pagination.items],
|
||
"total": pagination.total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
}
|
||
)
|
||
|
||
|
||
@admin_bp.put("/appeals/<int:post_id>/process")
|
||
@admin_required
|
||
def process_appeal(post_id: int):
|
||
row = ContentPost.query.get(post_id)
|
||
if not row:
|
||
return fail("记录不存在", 404)
|
||
if row.appeal_status != "pending":
|
||
return fail("该申诉不在待处理状态", 400)
|
||
|
||
payload = request.get_json(silent=True) or {}
|
||
action = (payload.get("action") or "").strip().lower()
|
||
note = (payload.get("note") or "").strip()
|
||
if action not in {"approve", "reject"}:
|
||
return fail("action 必须是 approve 或 reject", 400)
|
||
|
||
admin = current_user()
|
||
now = datetime.utcnow()
|
||
|
||
row.appeal_status = "approved" if action == "approve" else "rejected"
|
||
row.appeal_admin_note = note
|
||
row.appeal_processed_at = now
|
||
row.appeal_processed_by = admin.id if admin else None
|
||
|
||
row.manual_review_by = admin.id if admin else None
|
||
row.manual_review_note = note
|
||
row.manual_review_at = now
|
||
|
||
if action == "approve":
|
||
row.status = "published"
|
||
row.prediction = "ham"
|
||
row.manual_review_status = "approved_ham"
|
||
_upsert_manual_sample(row.text, "ham", admin.id if admin else None)
|
||
# 申诉通过,增加用户信誉分
|
||
if row.author:
|
||
row.author.credit_score = min(200, (row.author.credit_score or 100) + 10)
|
||
else:
|
||
row.status = "blocked"
|
||
row.prediction = "spam"
|
||
row.manual_review_status = "confirmed_spam"
|
||
_upsert_manual_sample(row.text, "spam", admin.id if admin else None)
|
||
# 申诉驳回,减少用户信誉分
|
||
if row.author:
|
||
row.author.credit_score = max(0, (row.author.credit_score or 100) - 5)
|
||
|
||
db.session.commit()
|
||
return ok(_serialize_post(row), "申诉处理完成")
|
||
|
||
|
||
@admin_bp.get("/users")
|
||
@admin_required
|
||
def list_users():
|
||
keyword = (request.args.get("keyword") or "").strip()
|
||
page = max(int(request.args.get("page", 1) or 1), 1)
|
||
page_size = min(max(int(request.args.get("page_size", 20) or 20), 1), 100)
|
||
|
||
query = User.query
|
||
if keyword:
|
||
query = query.filter(User.username.like(f"%{keyword}%") | User.nickname.like(f"%{keyword}%"))
|
||
|
||
pagination = query.order_by(User.id.desc()).paginate(page=page, per_page=page_size, error_out=False)
|
||
return ok(
|
||
{
|
||
"items": [item.to_dict() for item in pagination.items],
|
||
"total": pagination.total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
}
|
||
)
|
||
|
||
|
||
@admin_bp.post("/users/import")
|
||
@admin_required
|
||
def import_users():
|
||
payload = request.get_json(silent=True) or {}
|
||
items = payload.get("items") or []
|
||
if not isinstance(items, list) or not items:
|
||
return fail("items 必须是非空数组", 400)
|
||
|
||
created = 0
|
||
updated = 0
|
||
|
||
for row in items:
|
||
username = (row.get("username") or "").strip()
|
||
if len(username) < 3:
|
||
continue
|
||
|
||
user = User.query.filter_by(username=username).first()
|
||
if not user:
|
||
user = User(
|
||
username=username,
|
||
nickname=(row.get("nickname") or username).strip(),
|
||
company=(row.get("company") or "").strip(),
|
||
title=(row.get("title") or "").strip(),
|
||
phone=(row.get("phone") or "").strip(),
|
||
is_admin=bool(row.get("is_admin", False)),
|
||
)
|
||
user.set_password(row.get("password") or "123456")
|
||
db.session.add(user)
|
||
created += 1
|
||
continue
|
||
|
||
user.nickname = (row.get("nickname") or user.nickname).strip()
|
||
user.company = (row.get("company") or user.company).strip()
|
||
user.title = (row.get("title") or user.title).strip()
|
||
user.phone = (row.get("phone") or user.phone).strip()
|
||
if "is_admin" in row:
|
||
user.is_admin = bool(row.get("is_admin"))
|
||
if row.get("password"):
|
||
user.set_password(row["password"])
|
||
updated += 1
|
||
|
||
db.session.commit()
|
||
return ok({"created": created, "updated": updated}, "用户导入完成")
|
||
|
||
|
||
@admin_bp.put("/users/<int:user_id>")
|
||
@admin_required
|
||
def update_user(user_id: int):
|
||
user = User.query.get(user_id)
|
||
if not user:
|
||
return fail("用户不存在", 404)
|
||
|
||
payload = request.get_json(silent=True) or {}
|
||
if "nickname" in payload:
|
||
user.nickname = (payload.get("nickname") or user.nickname).strip()
|
||
if "company" in payload:
|
||
user.company = (payload.get("company") or "").strip()
|
||
if "title" in payload:
|
||
user.title = (payload.get("title") or "").strip()
|
||
if "phone" in payload:
|
||
user.phone = (payload.get("phone") or "").strip()
|
||
if "is_admin" in payload:
|
||
user.is_admin = bool(payload.get("is_admin"))
|
||
if "credit_score" in payload:
|
||
try:
|
||
credit = int(payload.get("credit_score", 100))
|
||
user.credit_score = max(0, min(200, credit))
|
||
except Exception:
|
||
pass
|
||
if payload.get("password"):
|
||
if len(payload["password"]) < 6:
|
||
return fail("密码至少6位", 400)
|
||
user.set_password(payload["password"])
|
||
|
||
db.session.commit()
|
||
return ok(user.to_dict(), "用户更新成功")
|
||
|
||
|
||
@admin_bp.delete("/users/<int:user_id>")
|
||
@admin_required
|
||
def delete_user(user_id: int):
|
||
user = User.query.get(user_id)
|
||
if not user:
|
||
return fail("用户不存在", 404)
|
||
|
||
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
|
||
return fail("至少保留一个管理员账号", 400)
|
||
|
||
db.session.delete(user)
|
||
db.session.commit()
|
||
return ok({}, "用户已删除")
|
||
|
||
|
||
@admin_bp.put("/users/<int:user_id>/credit")
|
||
@admin_required
|
||
def update_user_credit(user_id: int):
|
||
"""手动调整用户信誉分"""
|
||
user = User.query.get(user_id)
|
||
if not user:
|
||
return fail("用户不存在", 404)
|
||
|
||
payload = request.get_json(silent=True) or {}
|
||
try:
|
||
credit = int(payload.get("credit_score", user.credit_score or 100))
|
||
credit = max(0, min(200, credit))
|
||
except Exception:
|
||
return fail("信誉分必须是0-200之间的整数", 400)
|
||
|
||
user.credit_score = credit
|
||
db.session.commit()
|
||
return ok(user.to_dict(), "信誉分已更新")
|
||
|
||
|
||
@admin_bp.post("/users/recalculate-credit")
|
||
@admin_required
|
||
def recalculate_all_credit():
|
||
"""根据用户发布历史和申诉通过率重新计算信誉分"""
|
||
users = User.query.filter_by(is_admin=False).all()
|
||
updated_count = 0
|
||
|
||
for user in users:
|
||
posts = ContentPost.query.filter_by(user_id=user.id).all()
|
||
if not posts:
|
||
continue
|
||
|
||
# 计算发布成功率
|
||
published_count = sum(1 for p in posts if p.status == "published")
|
||
blocked_count = sum(1 for p in posts if p.status == "blocked")
|
||
total_count = len(posts)
|
||
|
||
if total_count == 0:
|
||
continue
|
||
|
||
publish_ratio = published_count / total_count
|
||
|
||
# 计算申诉通过率
|
||
appeals = [p for p in posts if p.appeal_status != "none"]
|
||
approved_appeals = sum(1 for p in appeals if p.appeal_status == "approved")
|
||
appeal_ratio = approved_appeals / len(appeals) if appeals else 0
|
||
|
||
# 基础信誉分:发布成功率贡献
|
||
base_score = 100
|
||
if publish_ratio >= 0.9:
|
||
base_score += 30 # 90%以上发布成功,+30
|
||
elif publish_ratio >= 0.7:
|
||
base_score += 15 # 70%以上,+15
|
||
elif publish_ratio < 0.5:
|
||
base_score -= 20 # 低于50%,-20
|
||
|
||
# 申诉通过率贡献
|
||
if appeal_ratio >= 0.8 and len(appeals) >= 3:
|
||
base_score += 20 # 80%以上申诉通过且有3次以上申诉,+20
|
||
elif appeal_ratio >= 0.5 and len(appeals) >= 2:
|
||
base_score += 10
|
||
|
||
# 限制范围
|
||
user.credit_score = max(0, min(200, base_score))
|
||
updated_count += 1
|
||
|
||
db.session.commit()
|
||
return ok({"updated_count": updated_count}, "信誉分批量重算完成")
|
||
|