import hashlib import json from datetime import datetime from pathlib import Path import joblib import numpy as np from sklearn.ensemble import RandomForestRegressor GOAL_MAP = { "maintain": 0, "lose_fat": 1, "gain_muscle": 2, "keto": 3, } OCCUPATION_MAP = { "通用": 0, "student": 1, "office": 2, "teacher": 3, "developer": 4, "healthcare": 5, "fitness": 6, "manual": 7, } class RandomForestDietRecommender: def __init__(self, model_path: str): self.model_path = Path(model_path) self.model = None self.recipe_signature = None @staticmethod def _encode_goal(goal: str) -> int: return GOAL_MAP.get(goal or "maintain", 0) @staticmethod def _encode_occupation(occupation: str) -> int: occupation = occupation or "通用" if occupation in OCCUPATION_MAP: return OCCUPATION_MAP[occupation] return OCCUPATION_MAP["通用"] def _signature(self, recipes: list) -> str: raw = [ { "id": item.id, "name": item.name, "calories": item.calories, "protein": item.protein, "fat": item.fat, "carbs": item.carbs, "fiber": item.fiber, "updated": item.updated_at.isoformat() if item.updated_at else "", } for item in recipes ] raw_json = json.dumps(raw, ensure_ascii=False, sort_keys=True) return hashlib.md5(raw_json.encode("utf-8")).hexdigest() @staticmethod def _daily_target_kcal(profile: dict) -> float: goal = profile.get("goal", "maintain") baseline = 1800 + float(profile.get("exercise_kcal", 0)) * 0.4 if goal == "lose_fat": baseline *= 0.82 elif goal == "gain_muscle": baseline *= 1.12 elif goal == "keto": baseline *= 0.9 return max(baseline, 1200) @staticmethod def _heuristic_score(profile: dict, recipe) -> float: goal = profile.get("goal", "maintain") daily_target = RandomForestDietRecommender._daily_target_kcal(profile) target_per_meal = daily_target / 3 cal_gap_ratio = abs(recipe.calories - target_per_meal) / max(target_per_meal, 1) protein_ratio = recipe.protein / max(recipe.calories, 1) carbs_ratio = recipe.carbs / max(recipe.calories, 1) fat_ratio = recipe.fat / max(recipe.calories, 1) score = 100.0 score -= min(cal_gap_ratio * 55, 50) if goal == "lose_fat": score += min(recipe.protein * 0.6, 18) score -= max((recipe.fat - 20) * 0.7, 0) score -= max((recipe.carbs - 55) * 0.3, 0) elif goal == "gain_muscle": score += min(recipe.protein * 0.8, 26) score += min(recipe.carbs * 0.2, 10) elif goal == "keto": score += min(recipe.fat * 0.4, 18) score -= max(recipe.carbs - 30, 0) * 0.8 else: score += min(recipe.fiber * 1.2, 8) body_fat = float(profile.get("body_fat", 20)) if body_fat > 28: score -= max(recipe.calories - 520, 0) * 0.03 intake_kcal = float(profile.get("intake_kcal", 1800)) if intake_kcal > daily_target: score -= max(recipe.calories - 460, 0) * 0.02 score += np.clip((protein_ratio - 0.12) * 100, -8, 8) score += np.clip((0.08 - carbs_ratio) * 80 if goal == "keto" else 0, -6, 6) score += np.clip((0.25 - fat_ratio) * 30 if goal == "lose_fat" else 0, -5, 5) return float(np.clip(score, 1, 100)) def _build_feature(self, profile: dict, recipe) -> list: return [ float(profile.get("weight", 65)), float(profile.get("body_fat", 20)), float(profile.get("exercise_kcal", 300)), float(profile.get("intake_kcal", 1800)), float(profile.get("age", 25)), float(profile.get("height_cm", 170)), float(self._encode_goal(profile.get("goal", "maintain"))), float(self._encode_occupation(profile.get("occupation", "通用"))), float(recipe.calories), float(recipe.protein), float(recipe.fat), float(recipe.carbs), float(recipe.fiber or 0), ] def _sample_profiles(self, n: int = 600) -> list: rng = np.random.default_rng(2026) goals = list(GOAL_MAP.keys()) occupations = list(OCCUPATION_MAP.keys()) profiles = [] for _ in range(n): goal = goals[int(rng.integers(0, len(goals)))] occupation = occupations[int(rng.integers(0, len(occupations)))] profiles.append( { "weight": float(rng.uniform(45, 100)), "body_fat": float(rng.uniform(10, 38)), "exercise_kcal": float(rng.uniform(50, 850)), "intake_kcal": float(rng.uniform(1200, 3200)), "age": float(rng.uniform(18, 55)), "height_cm": float(rng.uniform(150, 190)), "goal": goal, "occupation": occupation, } ) return profiles def train(self, recipes: list) -> None: if not recipes: raise ValueError("训练随机森林前至少需要 1 条食谱数据") x_rows = [] y_rows = [] sampled_profiles = self._sample_profiles() for profile in sampled_profiles: for recipe in recipes: x_rows.append(self._build_feature(profile, recipe)) y_rows.append(self._heuristic_score(profile, recipe)) x = np.array(x_rows) y = np.array(y_rows) model = RandomForestRegressor( n_estimators=240, random_state=2026, max_depth=12, min_samples_leaf=2, n_jobs=-1, ) model.fit(x, y) self.model = model self.recipe_signature = self._signature(recipes) self.model_path.parent.mkdir(parents=True, exist_ok=True) joblib.dump( { "model": model, "recipe_signature": self.recipe_signature, "trained_at": datetime.utcnow().isoformat(), }, self.model_path, ) def load_or_train(self, recipes: list) -> None: current_signature = self._signature(recipes) if self.model_path.exists(): payload = joblib.load(self.model_path) if payload.get("recipe_signature") == current_signature: self.model = payload["model"] self.recipe_signature = current_signature return self.train(recipes) @staticmethod def _build_reason(profile: dict, recipe, score: float) -> str: goal = profile.get("goal", "maintain") if goal == "lose_fat": return f"热量适中,蛋白质 {recipe.protein}g,适合减脂期控热量和保肌。" if goal == "gain_muscle": return f"蛋白质与碳水配置较高,适合增肌训练后的恢复。" if goal == "keto": return f"碳水 {recipe.carbs}g,偏低碳结构,适合生酮期参考。" if score > 80: return "营养均衡度高,适合作为日常轻食搭配。" return "综合营养结构较均衡,可作为个性化备选方案。" def recommend(self, profile: dict, recipes: list, top_k: int = 5) -> list: if not recipes: return [] self.load_or_train(recipes) x = np.array([self._build_feature(profile, recipe) for recipe in recipes]) pred_scores = self.model.predict(x) result = [] for recipe, score in zip(recipes, pred_scores): row = recipe.to_dict() row["rf_score"] = round(float(score), 2) row["reason"] = self._build_reason(profile, recipe, float(score)) result.append(row) result.sort(key=lambda item: item["rf_score"], reverse=True) return result[:top_k] def merge_profile_with_history(base_profile: dict, history: list) -> dict: if not history: return base_profile weights = [item.weight for item in history] body_fats = [item.body_fat for item in history] exercise = [item.exercise_kcal for item in history] intake = [item.intake_kcal for item in history] merged = dict(base_profile) merged["weight"] = float(np.mean(weights)) merged["body_fat"] = float(np.mean(body_fats)) merged["exercise_kcal"] = float(np.mean(exercise)) merged["intake_kcal"] = float(np.mean(intake)) return merged