253 lines
8.6 KiB
Python
253 lines
8.6 KiB
Python
import hashlib
|
||
import json
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import joblib
|
||
import numpy as np
|
||
from sklearn.ensemble import RandomForestRegressor
|
||
|
||
|
||
GOAL_MAP = {
|
||
"maintain": 0,
|
||
"lose_fat": 1,
|
||
"gain_muscle": 2,
|
||
"keto": 3,
|
||
}
|
||
|
||
OCCUPATION_MAP = {
|
||
"通用": 0,
|
||
"student": 1,
|
||
"office": 2,
|
||
"teacher": 3,
|
||
"developer": 4,
|
||
"healthcare": 5,
|
||
"fitness": 6,
|
||
"manual": 7,
|
||
}
|
||
|
||
|
||
class RandomForestDietRecommender:
|
||
def __init__(self, model_path: str):
|
||
self.model_path = Path(model_path)
|
||
self.model = None
|
||
self.recipe_signature = None
|
||
|
||
@staticmethod
|
||
def _encode_goal(goal: str) -> int:
|
||
return GOAL_MAP.get(goal or "maintain", 0)
|
||
|
||
@staticmethod
|
||
def _encode_occupation(occupation: str) -> int:
|
||
occupation = occupation or "通用"
|
||
if occupation in OCCUPATION_MAP:
|
||
return OCCUPATION_MAP[occupation]
|
||
return OCCUPATION_MAP["通用"]
|
||
|
||
def _signature(self, recipes: list) -> str:
|
||
raw = [
|
||
{
|
||
"id": item.id,
|
||
"name": item.name,
|
||
"calories": item.calories,
|
||
"protein": item.protein,
|
||
"fat": item.fat,
|
||
"carbs": item.carbs,
|
||
"fiber": item.fiber,
|
||
"updated": item.updated_at.isoformat() if item.updated_at else "",
|
||
}
|
||
for item in recipes
|
||
]
|
||
raw_json = json.dumps(raw, ensure_ascii=False, sort_keys=True)
|
||
return hashlib.md5(raw_json.encode("utf-8")).hexdigest()
|
||
|
||
@staticmethod
|
||
def _daily_target_kcal(profile: dict) -> float:
|
||
goal = profile.get("goal", "maintain")
|
||
baseline = 1800 + float(profile.get("exercise_kcal", 0)) * 0.4
|
||
if goal == "lose_fat":
|
||
baseline *= 0.82
|
||
elif goal == "gain_muscle":
|
||
baseline *= 1.12
|
||
elif goal == "keto":
|
||
baseline *= 0.9
|
||
return max(baseline, 1200)
|
||
|
||
@staticmethod
|
||
def _heuristic_score(profile: dict, recipe) -> float:
|
||
goal = profile.get("goal", "maintain")
|
||
daily_target = RandomForestDietRecommender._daily_target_kcal(profile)
|
||
target_per_meal = daily_target / 3
|
||
|
||
cal_gap_ratio = abs(recipe.calories - target_per_meal) / max(target_per_meal, 1)
|
||
protein_ratio = recipe.protein / max(recipe.calories, 1)
|
||
carbs_ratio = recipe.carbs / max(recipe.calories, 1)
|
||
fat_ratio = recipe.fat / max(recipe.calories, 1)
|
||
|
||
score = 100.0
|
||
score -= min(cal_gap_ratio * 55, 50)
|
||
|
||
if goal == "lose_fat":
|
||
score += min(recipe.protein * 0.6, 18)
|
||
score -= max((recipe.fat - 20) * 0.7, 0)
|
||
score -= max((recipe.carbs - 55) * 0.3, 0)
|
||
elif goal == "gain_muscle":
|
||
score += min(recipe.protein * 0.8, 26)
|
||
score += min(recipe.carbs * 0.2, 10)
|
||
elif goal == "keto":
|
||
score += min(recipe.fat * 0.4, 18)
|
||
score -= max(recipe.carbs - 30, 0) * 0.8
|
||
else:
|
||
score += min(recipe.fiber * 1.2, 8)
|
||
|
||
body_fat = float(profile.get("body_fat", 20))
|
||
if body_fat > 28:
|
||
score -= max(recipe.calories - 520, 0) * 0.03
|
||
|
||
intake_kcal = float(profile.get("intake_kcal", 1800))
|
||
if intake_kcal > daily_target:
|
||
score -= max(recipe.calories - 460, 0) * 0.02
|
||
|
||
score += np.clip((protein_ratio - 0.12) * 100, -8, 8)
|
||
score += np.clip((0.08 - carbs_ratio) * 80 if goal == "keto" else 0, -6, 6)
|
||
score += np.clip((0.25 - fat_ratio) * 30 if goal == "lose_fat" else 0, -5, 5)
|
||
|
||
return float(np.clip(score, 1, 100))
|
||
|
||
def _build_feature(self, profile: dict, recipe) -> list:
|
||
return [
|
||
float(profile.get("weight", 65)),
|
||
float(profile.get("body_fat", 20)),
|
||
float(profile.get("exercise_kcal", 300)),
|
||
float(profile.get("intake_kcal", 1800)),
|
||
float(profile.get("age", 25)),
|
||
float(profile.get("height_cm", 170)),
|
||
float(self._encode_goal(profile.get("goal", "maintain"))),
|
||
float(self._encode_occupation(profile.get("occupation", "通用"))),
|
||
float(recipe.calories),
|
||
float(recipe.protein),
|
||
float(recipe.fat),
|
||
float(recipe.carbs),
|
||
float(recipe.fiber or 0),
|
||
]
|
||
|
||
def _sample_profiles(self, n: int = 600) -> list:
|
||
rng = np.random.default_rng(2026)
|
||
goals = list(GOAL_MAP.keys())
|
||
occupations = list(OCCUPATION_MAP.keys())
|
||
|
||
profiles = []
|
||
for _ in range(n):
|
||
goal = goals[int(rng.integers(0, len(goals)))]
|
||
occupation = occupations[int(rng.integers(0, len(occupations)))]
|
||
profiles.append(
|
||
{
|
||
"weight": float(rng.uniform(45, 100)),
|
||
"body_fat": float(rng.uniform(10, 38)),
|
||
"exercise_kcal": float(rng.uniform(50, 850)),
|
||
"intake_kcal": float(rng.uniform(1200, 3200)),
|
||
"age": float(rng.uniform(18, 55)),
|
||
"height_cm": float(rng.uniform(150, 190)),
|
||
"goal": goal,
|
||
"occupation": occupation,
|
||
}
|
||
)
|
||
return profiles
|
||
|
||
def train(self, recipes: list) -> None:
|
||
if not recipes:
|
||
raise ValueError("训练随机森林前至少需要 1 条食谱数据")
|
||
|
||
x_rows = []
|
||
y_rows = []
|
||
sampled_profiles = self._sample_profiles()
|
||
|
||
for profile in sampled_profiles:
|
||
for recipe in recipes:
|
||
x_rows.append(self._build_feature(profile, recipe))
|
||
y_rows.append(self._heuristic_score(profile, recipe))
|
||
|
||
x = np.array(x_rows)
|
||
y = np.array(y_rows)
|
||
|
||
model = RandomForestRegressor(
|
||
n_estimators=240,
|
||
random_state=2026,
|
||
max_depth=12,
|
||
min_samples_leaf=2,
|
||
n_jobs=-1,
|
||
)
|
||
model.fit(x, y)
|
||
|
||
self.model = model
|
||
self.recipe_signature = self._signature(recipes)
|
||
|
||
self.model_path.parent.mkdir(parents=True, exist_ok=True)
|
||
joblib.dump(
|
||
{
|
||
"model": model,
|
||
"recipe_signature": self.recipe_signature,
|
||
"trained_at": datetime.utcnow().isoformat(),
|
||
},
|
||
self.model_path,
|
||
)
|
||
|
||
def load_or_train(self, recipes: list) -> None:
|
||
current_signature = self._signature(recipes)
|
||
if self.model_path.exists():
|
||
payload = joblib.load(self.model_path)
|
||
if payload.get("recipe_signature") == current_signature:
|
||
self.model = payload["model"]
|
||
self.recipe_signature = current_signature
|
||
return
|
||
|
||
self.train(recipes)
|
||
|
||
@staticmethod
|
||
def _build_reason(profile: dict, recipe, score: float) -> str:
|
||
goal = profile.get("goal", "maintain")
|
||
if goal == "lose_fat":
|
||
return f"热量适中,蛋白质 {recipe.protein}g,适合减脂期控热量和保肌。"
|
||
if goal == "gain_muscle":
|
||
return f"蛋白质与碳水配置较高,适合增肌训练后的恢复。"
|
||
if goal == "keto":
|
||
return f"碳水 {recipe.carbs}g,偏低碳结构,适合生酮期参考。"
|
||
if score > 80:
|
||
return "营养均衡度高,适合作为日常轻食搭配。"
|
||
return "综合营养结构较均衡,可作为个性化备选方案。"
|
||
|
||
def recommend(self, profile: dict, recipes: list, top_k: int = 5) -> list:
|
||
if not recipes:
|
||
return []
|
||
|
||
self.load_or_train(recipes)
|
||
x = np.array([self._build_feature(profile, recipe) for recipe in recipes])
|
||
pred_scores = self.model.predict(x)
|
||
|
||
result = []
|
||
for recipe, score in zip(recipes, pred_scores):
|
||
row = recipe.to_dict()
|
||
row["rf_score"] = round(float(score), 2)
|
||
row["reason"] = self._build_reason(profile, recipe, float(score))
|
||
result.append(row)
|
||
|
||
result.sort(key=lambda item: item["rf_score"], reverse=True)
|
||
return result[:top_k]
|
||
|
||
|
||
def merge_profile_with_history(base_profile: dict, history: list) -> dict:
|
||
if not history:
|
||
return base_profile
|
||
|
||
weights = [item.weight for item in history]
|
||
body_fats = [item.body_fat for item in history]
|
||
exercise = [item.exercise_kcal for item in history]
|
||
intake = [item.intake_kcal for item in history]
|
||
|
||
merged = dict(base_profile)
|
||
merged["weight"] = float(np.mean(weights))
|
||
merged["body_fat"] = float(np.mean(body_fats))
|
||
merged["exercise_kcal"] = float(np.mean(exercise))
|
||
merged["intake_kcal"] = float(np.mean(intake))
|
||
return merged
|