线上监控与数据漂移是确保机器学习模型长期有效性的关键环节。模型在部署后可能会因现实世界变化而失效,因此需要持续监控其性能和输入数据分布。核心概念包括性能监控、数据漂移检测和延迟反馈。性能监控关注延迟(P99)、吞吐量(QPS)和错误率;数据漂移检测则通过KS检验(连续特征)、PSI(风控模型常用)和卡方检验(类别特征)来识别输入或输出分布的变化。读者将学会如何设置监控指标、使用Evidently AI等工具生成漂移报告,以及根据业务需求调整告警阈值。掌握这些技能后,读者能够有效识别模型老化问题,及时采取措施(如模型重训)以维持模型的高效运行。
线上监控与数据漂移
模型上线第一天是准的, 3 个月后可能就废了。 因为现实世界在变, 而你的模型还在用 3 个月前的数据规律。
这一章讲两件事:
- 性能监控: 线上模型真的准吗?
- 数据漂移检测: 进来的数据分布变了没?
模型为什么会"老化"
- 用户偏好变化 (iPhone 卖得少了)
- 经济环境变化 (疫情期间销量暴增)
- 竞争对手上新功能
- 数据 pipeline bug 导致特征分布改变
- 政策/法规变化
任何一个原因都让历史训练数据"过期"。
监控 3 件套:Latency / Throughput / Error
# 加在 FastAPI 中
from prometheus_client import Counter, Histogram, Gauge
PREDICT_COUNT = Counter("model_predict_total", "Predictions served", ["model_version", "label"])
PREDICT_LATENCY = Histogram("model_predict_latency_seconds", "Prediction latency", ["model_version"])
INFLIGHT = Gauge("model_inflight_requests", "In-flight requests", ["model_version"])
@app.post("/predict")
def predict(inp: Input):
INFLIGHT.labels(model_version="v1").inc()
start = time.time()
try:
pred = model.predict([inp.features])[0]
PREDICT_COUNT.labels(model_version="v1", label=str(pred)).inc()
return {"prediction": int(pred)}
except Exception as e:
PREDICT_COUNT.labels(model_version="v1", label="error").inc()
raise
finally:
PREDICT_LATENCY.labels(model_version="v1").observe(time.time() - start)
INFLIGHT.labels(model_version="v1").dec()
Grafana 看板监控 3 个图:
- 延迟 P99 < 100ms (SLA)
- QPS (每秒请求数) 趋势
- 错误率 < 0.1%
性能监控的难题: 延迟反馈
真正难的不是"服务挂没挂", 而是"模型准不准"。因为:
- 真实 label 通常要几小时/几天才知道 (用户有没有点击, 贷款有没有违约)
- 没有 label = 没法算 accuracy
3 个解决方案:
- 代理指标 (Proxy Metric): 用"模型输出分布"代替"准确率"。比如"预测为正类的比例突然从 30% 涨到 50%", 警告
- 业务指标: 转化率、点击率、AUC 都能算, 拼多多、抖音都在用
- 人工抽样: 每天抽 1000 条人工标, 计算真实准确率
数据漂移:特征分布变了
训练时用户平均年龄 30, 现在用户平均年龄 50 — 你的模型没学过中年人, 准确率必然下降。这就是数据漂移。
3 类漂移:
- Covariate shift: 输入分布 P(X) 变了
- Label shift: 输出分布 P(Y) 变了
- Concept drift: 关系 P(Y|X) 变了 (最难发现)
检测 1:KS 检验 (适合连续特征)
Kolmogorov-Smirnov 检验比较"训练分布"和"线上分布":
from scipy.stats import ks_2samp
def detect_drift_ks(train_data, live_data, threshold=0.05):
"""返回每个特征的 p 值, p<0.05 说明漂移"""
results = {}
for col in train_data.columns:
stat, p_value = ks_2samp(train_data[col].dropna(), live_data[col].dropna())
results[col] = {"ks_stat": stat, "p_value": p_value, "drift": p_value < threshold}
return pd.DataFrame(results).T
# 实战
train = pd.read_parquet("training_features.parquet")
live = get_last_week_features() # 线上最近 7 天
drift_report = detect_drift_ks(train, live)
print(drift_report[drift_report["drift"]])
# ks_stat p_value drift
# user_age 0.18 0.001 True <- 漂移!
# user_income 0.05 0.42 False
KS 检验对连续特征最敏感, 但对类别特征不好用 (用 PSI)。
检测 2:PSI (Population Stability Index)
PSI 是风控模型最常用的漂移指标:
PSI = Σ (实际占比 - 期望占比) × ln(实际占比 / 期望占比)
| PSI | 解读 |
|---|---|
| < 0.1 | 无漂移 |
| 0.1 - 0.25 | 轻微漂移, 关注 |
| > 0.25 | 严重漂移, 需重训 |
import numpy as np
def calculate_psi(expected, actual, bins=10):
"""PSI: 比较 expected (训练) 和 actual (线上) 分布"""
breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0]
actual_counts = np.histogram(actual, breakpoints)[0]
# 避免除零
expected_pct = (expected_counts + 1) / (expected_counts.sum() + len(expected_counts))
actual_pct = (actual_counts + 1) / (actual_counts.sum() + len(actual_counts))
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return psi
psi = calculate_psi(train["user_age"], live["user_age"])
print(f"PSI: {psi:.3f}")
# 0.35 > 0.25 → 严重漂移, 需重训
检测 3:卡方检验 (类别特征)
类别特征用卡方检验:
from scipy.stats import chi2_contingency
def detect_drift_chi2(train_cats, live_cats, threshold=0.05):
results = {}
for col in train_cats.columns:
# 构造列联表
train_dist = train_cats[col].value_counts(normalize=True)
live_dist = live_cats[col].value_counts(normalize=True)
all_categories = set(train_dist.index) | set(live_dist.index)
# 补齐
observed = pd.DataFrame({
"train": [train_dist.get(c, 0) for c in all_categories],
"live": [live_dist.get(c, 0) for c in all_categories]
})
chi2, p, _, _ = chi2_contingency(observed.T)
results[col] = {"chi2": chi2, "p_value": p, "drift": p < threshold}
return pd.DataFrame(results).T
监控流水线 (Evidently AI)
手写 KS/PSI 麻烦, 用 evidently 一键生成报告:
# pip install evidently
from evidently.report import Report
from evidently.metrics import DataDriftTable, DatasetDriftMetric
# 准备 reference (训练) 和 current (线上)
report = Report(metrics=[DataDriftTable(), DatasetDriftMetric()])
report.run(reference_data=train, current_data=live)
# 1. HTML 可视化
report.save_html("drift_report.html")
# 2. 字典输出
result = report.as_dict()
drift_score = result["metrics"][0]["result"]["drift_by_columns"]
print(drift_score)
# {'user_age': {'drift_detected': True, 'drift_score': 0.21}, ...}
evidently 还支持:
- 目标漂移 (Target Drift)
- 模型性能 (Regression / Classification Performance)
- 数据质量 (Data Quality)
完整监控栈
[线上模型] → 日志 (input, output, latency) → Kafka → 离线分析 (Spark/Flink)
↓
KS / PSI / 卡方检验 (Evidently / whylogs)
↓
漂移告警 → Slack / 钉钉 / PagerDuty
↓
触发重训 pipeline (Airflow)
告警阈值建议
| 指标 | 黄警 | 红警 |
|---|---|---|
| P99 延迟 | > 200ms | > 500ms |
| 错误率 | > 1% | > 5% |
| PSI (任一特征) | > 0.1 | > 0.25 |
| 输出分布偏移 | > 5% | > 10% |
| 转化率下降 | > 5% | > 10% |
5 个工程实践
- 每周自动跑一次漂移报告, 不要等出问题
- 同时监控输入和输出分布, 任何一个变了都要查
- 保留近 30 天线上数据, 用于事后分析
- 漂移阈值根据业务调, 金融领域 PSI > 0.1 就该重训
- 告警去重: 5 分钟内同类告警不重复推
小结
- 模型会"老化": 用户偏好、季节、竞品都在变
- 监控 3 件套: 延迟 / 错误 / 性能 (P99 / error / accuracy)
- 数据漂移: KS (连续) / PSI (连续+风控) / 卡方 (类别)
- Evidently AI 一键生成漂移报告
- 漂移阈值: PSI 0.1 黄 / 0.25 红
- 重训 pipeline: 漂移告警 → Airflow 触发 → MLflow 记录新版本
练习思考
- 用 Evidently 对你前 30 天和最近 30 天的用户行为数据做漂移报告, 哪些特征漂移最严重?
- 金融领域 PSI > 0.1 就重训, 互联网产品可以宽松到 0.2, 为什么?
- 没有真实 label 时, 怎么估算"模型准确率"? 除了抽样, 还有其他办法吗?
章末小测验
检验你对《线上监控与数据漂移》的掌握程度。
1
KS 检验适合检测什么类型特征的漂移?
2
数据漂移的 3 大类是?
讨论区(0)
加载评论中...