ML 学习站
跳到正文

时序异常检测

滚动 z-score, STL 残差, Isolation Forest 实战。

35 分钟4 / 41,500
加载中...

时序异常检测旨在识别数据中与历史模式不同的异常点,应用于服务器流量、信用卡交易、心电图及制造业设备等领域。核心概念包括点异常和上下文异常,前者指单个时间点的异常,后者指在特定上下文中出现的异常。读者将掌握多种检测方法,包括简单的滚动 z-score、适用于季节性数据的 STL 分解与残差分析、基于 Prophet 的预测区间方法、无需假设数据分布的 Isolation Forest 以及针对局部异常的 LOF 方法。此外,还将了解多方法融合策略以降低误报率,并学习如何评估检测效果,重点关注高召回率(Recall)和合理的精确率(Precision)。学完后,读者能够根据不同场景选择合适的检测方法,并应用于实时数据流处理和工业级监控系统中。

时序异常检测

异常检测 (Anomaly Detection) 找出"和过去不一样"的点。应用场景: 服务器流量突增、信用卡交易欺诈、心电图异常、制造业设备故障。

异常的两类

  1. 点异常 (Point Anomaly): 某个时间点偏离正常 (CPU 突然 100%)
  2. 上下文异常 (Contextual Anomaly): 数值本身正常, 但在当前上下文中异常 (冬天 30°C 是异常, 夏天 30°C 是正常)

方法 1:滚动 z-score (最简单)

基本思路: 滑动窗口内, 偏离均值超过 N 个标准差就是异常。

import numpy as np
import pandas as pd

def rolling_zscore_anomaly(y, window=30, threshold=3):
    """window: 滑动窗口大小, threshold: z 分数阈值"""
    s = pd.Series(y)
    rolling_mean = s.rolling(window=window, min_periods=1).mean()
    rolling_std = s.rolling(window=window, min_periods=1).std()
    z = (s - rolling_mean) / rolling_std
    return z.abs() > threshold, z

# 模拟数据
np.random.seed(42)
y = np.concatenate([
    np.random.normal(100, 10, 200),
    [300, 50, 280],  # 3 个异常点
    np.random.normal(100, 10, 100)
])
is_anomaly, z = rolling_zscore_anomaly(y, window=30, threshold=3)
print(np.where(is_anomaly)[0])  # 异常点索引

简单但只适合平稳序列, 不能处理季节性 (双11 销量高不是异常)。

方法 2:STL 分解 + 残差 (推荐,处理季节性)

把时序拆成 Trend + Seasonal + Residual, 残差超出阈值就是异常:

from statsmodels.tsa.seasonal import STL
import matplotlib.pyplot as plt

# 假设是带周季节性的销售数据
stl = STL(series, period=7, robust=True)  # robust=True 自动压制异常
result = stl.fit()

# 3 个分量
trend = result.trend
seasonal = result.seasonal
residual = result.resid

# 异常: 残差偏离均值太远
threshold = 3 * residual.std()
is_anomaly = residual.abs() > threshold

# 画图
fig = result.plot()
for i in is_anomaly[is_anomaly].index:
    plt.axvline(i, color='red', alpha=0.3)
plt.show()

robust=True 让分解过程对异常值鲁棒,不会让异常点"污染"季节性估计。

方法 3:Prophet 自带异常检测

Prophet 的 yhat_lower / yhat_upper 给出了预测区间, 实际值超出区间就是异常:

from prophet import Prophet

model = Prophet(interval_width=0.99)  # 99% 置信区间
model.fit(df)

forecast = model.predict(df)  # 用历史数据做预测
df["yhat"] = forecast["yhat"]
df["yhat_lower"] = forecast["yhat_lower"]
df["yhat_upper"] = forecast["yhat_upper"]

# 异常: 实际值超出区间
df["is_anomaly"] = (df["y"] < df["yhat_lower"]) | (df["y"] > df["yhat_upper"])
print(df[df["is_anomaly"]][["ds", "y", "yhat_lower", "yhat_upper"]])

适合"有规律可循"的时序。

方法 4:Isolation Forest (无监督机器学习)

不用假设分布, 直接学"什么是正常":

from sklearn.ensemble import IsolationForest
import numpy as np

# 准备特征: 数值本身 + 滑动均值 + 滑动标准差
df["rolling_mean_7"] = df["y"].rolling(7).mean()
df["rolling_std_7"] = df["y"].rolling(7).std()
features = df[["y", "rolling_mean_7", "rolling_std_7"]].dropna()

# 训练
clf = IsolationForest(contamination=0.05, random_state=42)  # 假设 5% 异常
clf.fit(features)

# 预测: -1 = 异常, 1 = 正常
df["anomaly_score"] = clf.predict(features)
df["anomaly"] = df["anomaly_score"] == -1

contamination 是先验异常比例,设 5% 意味着"我猜 5% 数据是异常"。

方法 5:基于密度 (LOF)

from sklearn.neighbors import LocalOutlierFactor

clf = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
df["anomaly"] = clf.fit_predict(features) == -1

适合局部异常 (某点在自己周围显得异常, 全局看可能正常)。

多方法融合:投票

单一方法容易误报, 工业上一般多方法投票:

# 3 个方法
df["z_anomaly"] = rolling_zscore_anomaly(df["y"])[0]
df["stl_anomaly"] = stl_anomaly  # STL 残差
df["iforest_anomaly"] = df["anomaly"]  # Isolation Forest

# 至少 2 个方法说是异常
df["final_anomaly"] = (df["z_anomaly"].astype(int) + 
                       df["stl_anomaly"].astype(int) + 
                       df["iforest_anomaly"].astype(int)) >= 2

评估:看 Precision / Recall

如果有标注的异常 (人工标的双11、故障时间), 可以评估:

from sklearn.metrics import classification_report

y_true = ...   # 0/1 标签
y_pred = df["final_anomaly"].astype(int)

print(classification_report(y_true, y_pred, target_names=["正常", "异常"]))

目标: Recall 高 (不要漏掉真异常) + Precision 不太低 (不要太多假警报)。

实时异常检测:流式数据

线上系统需要秒级响应:

import redis
from collections import deque

class StreamingAnomalyDetector:
    def __init__(self, window_size=60, threshold=3):
        self.window = deque(maxlen=window_size)
        self.threshold = threshold
    
    def add(self, value: float) -> bool:
        """返回 True 表示检测到异常"""
        if len(self.window) < 10:  # 攒够数据再判断
            self.window.append(value)
            return False
        
        mean = np.mean(self.window)
        std = np.std(self.window)
        z = abs((value - mean) / std) if std > 0 else 0
        
        self.window.append(value)
        return z > self.threshold

用 Redis Streams / Kafka 接数据, 异常时推送到告警系统。

工业级方案

  • Twitter/AnomalyDetection (R 包): 时序异常检测 SOTA, 处理季节性
  • Amazon CloudWatch Anomaly Detection: 云厂商托管服务
  • Datadog / New Relic: 商业 APM 自带异常检测
  • Prometheus + AlertManager: 开源监控 + 告警

小结

  • 时序异常 = 偏离过去规律的点
  • 5 种方法: 滚动 z-score (简单) / STL 残差 (推荐) / Prophet 区间 / Isolation Forest / LOF
  • 多方法投票 降低误报
  • 实时场景用滑动窗口 + 阈值
  • 评估靠 Precision / Recall, 目标 Recall > 80%

练习思考

  1. 用 STL 分解找到你家月度电费的异常 (哪个月份是异常? 为什么?)
  2. 同一份数据, Isolation Forest 和 STL 残差方法找出的异常一样吗? 哪个更合理?
  3. 如果业务说"宁可多报也不要漏报" (漏一个异常损失 100 万), threshold 怎么调?

章末小测验

检验你对《时序异常检测》的掌握程度。

1

PSI (Population Stability Index) > 0.25 通常表示?

2

时序异常检测的 STL 分解方法的核心是?

讨论区(0)

加载评论中...