ML 学习站
跳到正文

线上监控与数据漂移

性能监控、数据漂移检测、PSI / KS 检验, 报警链路。

40 分钟3 / 32,095
加载中...

线上监控与数据漂移是确保机器学习模型长期有效性的关键环节。模型在部署后可能会因现实世界变化而失效,因此需要持续监控其性能和输入数据分布。核心概念包括性能监控、数据漂移检测和延迟反馈。性能监控关注延迟(P99)、吞吐量(QPS)和错误率;数据漂移检测则通过KS检验(连续特征)、PSI(风控模型常用)和卡方检验(类别特征)来识别输入或输出分布的变化。读者将学会如何设置监控指标、使用Evidently AI等工具生成漂移报告,以及根据业务需求调整告警阈值。掌握这些技能后,读者能够有效识别模型老化问题,及时采取措施(如模型重训)以维持模型的高效运行。

线上监控与数据漂移

模型上线第一天是准的, 3 个月后可能就废了。 因为现实世界在变, 而你的模型还在用 3 个月前的数据规律。

这一章讲两件事:

  1. 性能监控: 线上模型真的准吗?
  2. 数据漂移检测: 进来的数据分布变了没?

模型为什么会"老化"

  • 用户偏好变化 (iPhone 卖得少了)
  • 经济环境变化 (疫情期间销量暴增)
  • 竞争对手上新功能
  • 数据 pipeline bug 导致特征分布改变
  • 政策/法规变化

任何一个原因都让历史训练数据"过期"

监控 3 件套:Latency / Throughput / Error

# 加在 FastAPI 中
from prometheus_client import Counter, Histogram, Gauge

PREDICT_COUNT = Counter("model_predict_total", "Predictions served", ["model_version", "label"])
PREDICT_LATENCY = Histogram("model_predict_latency_seconds", "Prediction latency", ["model_version"])
INFLIGHT = Gauge("model_inflight_requests", "In-flight requests", ["model_version"])

@app.post("/predict")
def predict(inp: Input):
    INFLIGHT.labels(model_version="v1").inc()
    start = time.time()
    try:
        pred = model.predict([inp.features])[0]
        PREDICT_COUNT.labels(model_version="v1", label=str(pred)).inc()
        return {"prediction": int(pred)}
    except Exception as e:
        PREDICT_COUNT.labels(model_version="v1", label="error").inc()
        raise
    finally:
        PREDICT_LATENCY.labels(model_version="v1").observe(time.time() - start)
        INFLIGHT.labels(model_version="v1").dec()

Grafana 看板监控 3 个图:

  • 延迟 P99 < 100ms (SLA)
  • QPS (每秒请求数) 趋势
  • 错误率 < 0.1%

性能监控的难题: 延迟反馈

真正难的不是"服务挂没挂", 而是"模型准不准"。因为:

  • 真实 label 通常要几小时/几天才知道 (用户有没有点击, 贷款有没有违约)
  • 没有 label = 没法算 accuracy

3 个解决方案:

  1. 代理指标 (Proxy Metric): 用"模型输出分布"代替"准确率"。比如"预测为正类的比例突然从 30% 涨到 50%", 警告
  2. 业务指标: 转化率、点击率、AUC 都能算, 拼多多、抖音都在用
  3. 人工抽样: 每天抽 1000 条人工标, 计算真实准确率

数据漂移:特征分布变了

训练时用户平均年龄 30, 现在用户平均年龄 50 — 你的模型没学过中年人, 准确率必然下降。这就是数据漂移

3 类漂移:

  • Covariate shift: 输入分布 P(X) 变了
  • Label shift: 输出分布 P(Y) 变了
  • Concept drift: 关系 P(Y|X) 变了 (最难发现)

检测 1:KS 检验 (适合连续特征)

Kolmogorov-Smirnov 检验比较"训练分布"和"线上分布":

from scipy.stats import ks_2samp

def detect_drift_ks(train_data, live_data, threshold=0.05):
    """返回每个特征的 p 值, p<0.05 说明漂移"""
    results = {}
    for col in train_data.columns:
        stat, p_value = ks_2samp(train_data[col].dropna(), live_data[col].dropna())
        results[col] = {"ks_stat": stat, "p_value": p_value, "drift": p_value < threshold}
    return pd.DataFrame(results).T
# 实战
train = pd.read_parquet("training_features.parquet")
live = get_last_week_features()  # 线上最近 7 天

drift_report = detect_drift_ks(train, live)
print(drift_report[drift_report["drift"]])
#                  ks_stat   p_value  drift
# user_age           0.18     0.001   True    <- 漂移!
# user_income        0.05     0.42    False

KS 检验对连续特征最敏感, 但对类别特征不好用 (用 PSI)。

检测 2:PSI (Population Stability Index)

PSI 是风控模型最常用的漂移指标:

PSI = Σ (实际占比 - 期望占比) × ln(实际占比 / 期望占比)
PSI解读
< 0.1无漂移
0.1 - 0.25轻微漂移, 关注
> 0.25严重漂移, 需重训
import numpy as np

def calculate_psi(expected, actual, bins=10):
    """PSI: 比较 expected (训练) 和 actual (线上) 分布"""
    breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
    
    expected_counts = np.histogram(expected, breakpoints)[0]
    actual_counts = np.histogram(actual, breakpoints)[0]
    
    # 避免除零
    expected_pct = (expected_counts + 1) / (expected_counts.sum() + len(expected_counts))
    actual_pct = (actual_counts + 1) / (actual_counts.sum() + len(actual_counts))
    
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
    return psi

psi = calculate_psi(train["user_age"], live["user_age"])
print(f"PSI: {psi:.3f}")
# 0.35 > 0.25 → 严重漂移, 需重训

检测 3:卡方检验 (类别特征)

类别特征用卡方检验:

from scipy.stats import chi2_contingency

def detect_drift_chi2(train_cats, live_cats, threshold=0.05):
    results = {}
    for col in train_cats.columns:
        # 构造列联表
        train_dist = train_cats[col].value_counts(normalize=True)
        live_dist = live_cats[col].value_counts(normalize=True)
        all_categories = set(train_dist.index) | set(live_dist.index)
        # 补齐
        observed = pd.DataFrame({
            "train": [train_dist.get(c, 0) for c in all_categories],
            "live":  [live_dist.get(c, 0) for c in all_categories]
        })
        chi2, p, _, _ = chi2_contingency(observed.T)
        results[col] = {"chi2": chi2, "p_value": p, "drift": p < threshold}
    return pd.DataFrame(results).T

监控流水线 (Evidently AI)

手写 KS/PSI 麻烦, 用 evidently 一键生成报告:

# pip install evidently
from evidently.report import Report
from evidently.metrics import DataDriftTable, DatasetDriftMetric

# 准备 reference (训练) 和 current (线上)
report = Report(metrics=[DataDriftTable(), DatasetDriftMetric()])
report.run(reference_data=train, current_data=live)

# 1. HTML 可视化
report.save_html("drift_report.html")

# 2. 字典输出
result = report.as_dict()
drift_score = result["metrics"][0]["result"]["drift_by_columns"]
print(drift_score)
# {'user_age': {'drift_detected': True, 'drift_score': 0.21}, ...}

evidently 还支持:

  • 目标漂移 (Target Drift)
  • 模型性能 (Regression / Classification Performance)
  • 数据质量 (Data Quality)

完整监控栈

[线上模型] → 日志 (input, output, latency) → Kafka → 离线分析 (Spark/Flink)
                                                  ↓
                            KS / PSI / 卡方检验 (Evidently / whylogs)
                                                  ↓
                            漂移告警 → Slack / 钉钉 / PagerDuty
                                                  ↓
                            触发重训 pipeline (Airflow)

告警阈值建议

指标黄警红警
P99 延迟> 200ms> 500ms
错误率> 1%> 5%
PSI (任一特征)> 0.1> 0.25
输出分布偏移> 5%> 10%
转化率下降> 5%> 10%

5 个工程实践

  1. 每周自动跑一次漂移报告, 不要等出问题
  2. 同时监控输入和输出分布, 任何一个变了都要查
  3. 保留近 30 天线上数据, 用于事后分析
  4. 漂移阈值根据业务调, 金融领域 PSI > 0.1 就该重训
  5. 告警去重: 5 分钟内同类告警不重复推

小结

  • 模型会"老化": 用户偏好、季节、竞品都在变
  • 监控 3 件套: 延迟 / 错误 / 性能 (P99 / error / accuracy)
  • 数据漂移: KS (连续) / PSI (连续+风控) / 卡方 (类别)
  • Evidently AI 一键生成漂移报告
  • 漂移阈值: PSI 0.1 黄 / 0.25 红
  • 重训 pipeline: 漂移告警 → Airflow 触发 → MLflow 记录新版本

练习思考

  1. 用 Evidently 对你前 30 天和最近 30 天的用户行为数据做漂移报告, 哪些特征漂移最严重?
  2. 金融领域 PSI > 0.1 就重训, 互联网产品可以宽松到 0.2, 为什么?
  3. 没有真实 label 时, 怎么估算"模型准确率"? 除了抽样, 还有其他办法吗?

章末小测验

检验你对《线上监控与数据漂移》的掌握程度。

1

KS 检验适合检测什么类型特征的漂移?

2

数据漂移的 3 大类是?

讨论区(0)

加载评论中...