现有一批药物数据,首先识别该药物的色谱数据和对应的活性数据的波峰,再对比波峰去寻找药物成份。

任务完成情况:

# 加载 Excel 数据

加载指定 Excel 数据,第 0,2 列是获取活性数据的时间(第一次给的数据时间中存在非 num 值 '--',需要剔除),第 1 列是第一次活性数据,第 3 列是第二次活性数据,第 4 列是获取色谱数据对应的时间,第 5 列是色谱数据(色谱数据普遍波动范围比较大,在读取后要进行归一化处理)。

class DataExcelLoader:
    def __init__(self, file_path):
        """
        初始化加载Excel数据
        :param file_path: excel文件名字(str)
        """
        self.df = pd.read_excel(file_path)
    def load_data(self, iloc_id=1, scaler_bool=False):
        """
        按列返回Excel数据
        :param scaler_bool: 是否对数据进行归一化。默认(False)
        :param iloc_id: 0: huo_xing_time, 1: huo_xing_data_1, 2: huo_xing_time,
                        3: huo_xing_data_2, 4: se_pu_time, 5: se_pu_data,
        :return: 返回指定列数据
        """
        scaler_arg = None
        if iloc_id in [0, 2]:
            # huo_xing_time,最早数据中有以下问题,后面没有,可以忽略
            # 修改源数据之前先复制一份
            huo_xing_time = self.df.iloc[:, 0].copy()
            # 把值为 '--' 替换为 None
            huo_xing_time[huo_xing_time == '--'] = np.nan
            # 去除 None 值
            df_data = huo_xing_time.dropna().values
        elif iloc_id in [1, 3, 4, 5]:
            df_data = self.df.iloc[:, iloc_id].dropna().values
            if scaler_bool:
                df_data, scaler_arg = scale_data(df_data)
        else:
            raise ValueError("iloc_id 输入不合法")
        return df_data, scaler_arg

# 查看数据分布

为了错位显示不同的活性数据,在 y 轴上进行了偏移。

def draw_all_data(time_error=0):
    huo_xing_time_data, _ = df_excel.load_data(0, scaler_bool=False)
    se_pu_time_data, _ = df_excel.load_data(4, scaler_bool=False)
    df_data_1, _ = df_excel.load_data(1, scaler_bool=False)
    df_data_3, _ = df_excel.load_data(3, scaler_bool=False)
    df_data_5, _ = df_excel.load_data(5, scaler_bool=True)
    
    plt.figure(figsize=(85, 15))
    plt.plot(huo_xing_time_data, df_data_1 + 0.5, marker='o', label='huoxing_data_1')
    plt.plot(huo_xing_time_data, df_data_3, marker='o', color='red', label='huoxing_data_2')
    plt.legend()
    # data_two.xlsx 色谱时间要统一减去 1.77,
    plt.plot(se_pu_time_data - time_error, df_data_5 - 1, color='blue', marker='o', label='se_pu_data')

数据1整体分布

数据从上往下依次为活性数据 1,活性数据 2,色谱数据。

📌色谱数据比活性数据密集,通常是 3~4 个色谱数据对应 1 个活性数据。为了与活性数据对应,可以采用滑动平均的方法。

# 去噪

尝试了以下两种方法,没有多大效果。后续需要进一步对比效果。

# 对数据进行去噪方法 1:滚动平均值
def rolling_data(df, column, windows_rolling=3):
    data = df.iloc[:, column].rolling(windows_rolling).mean().dropna().values
    return data
# 滚动窗口大小默认为 3
windows_rolling = 3
column = 1 # df 从 0 开始
rolling_huo_xing_data_1 = rolling_data(df, column, windows_rolling)
# 对数据进行去噪方法 2:傅里叶变换
import numpy as np
from scipy.fft import fft, ifft
def fft_data(data_list):
    # 进行傅立叶变换
    spectrum = fft(data_list)
    # 将高频噪声设置为零
    threshold = 3  # 阈值可以根据数据特性调整
    spectrum[np.abs(spectrum) < threshold] = 0
    # 进行傅立叶反变换
    filtered_data = ifft(spectrum)
    # 获取实部,丢弃虚部
    filtered_data_r = filtered_data.real
    return filtered_data_r
filtered_data = fft_data(huo_xing_data_1)

# 用统计学方法寻找活性数据的异常点

我们默认前一分钟数据点为正常点,可以先计算前一分钟的相邻数据之间的差值来确定前后数据的波动阈值,如果前后数据之差大于这个阈值,我们判定后面的数据为异常点。在这里我们设置异常点的阈值为前一分钟的相邻数据之间差值的平均值加减 3 倍其标准差。

# 先获取前 1 分钟的索引
def get_normal_index(data, minute=1):
    for index, t in enumerate(data):
        if t > minute:
            return index
# 再通过索引去获取对应的数据
def get_anomalies_series(data_list, normal_index=30):
    # 计算前 30 个数据点之间的差异
    differences = np.abs(data_list[1:normal_index] - data_list[0:normal_index-1])
    # differences = np.abs(data.iloc[1:30, index].values - data.iloc[0:29, index].values)
    # 计算差异的平均值和标准差
    mean_diff = np.mean(differences)
    max_diff = np.max(differences)
    std_diff = np.std(differences)
    # 设置异常点的阈值为平均值加减 2 倍标准差
    threshold_upper = mean_diff + 3 * std_diff
    # threshold_lower = mean_diff - 2 * std_diff
    # threshold_upper = max_diff
    anomalies_list = []
    # 从 normal_index 开始往后遍历,检测异常点
    for i in range(normal_index, len(data_list)):
        # 计算当前点与前一个点之间的差异
        current_diff = np.abs(data_list[i] - data_list[i-1])
        # 如果差异超出阈值范围,则标记为异常
        if current_diff > threshold_upper:
            # anomalies_series[i] = data_list[i]
            anomalies_list.append(i)
    return anomalies_list

求得对应异常索引值后,展示效果(以活性数据 1 为例),发现所得到的异常点并不是想要的完整波峰。

yi_chang

对异常索引进行波峰完善,完善策略如下:

  • 异常点位于波谷左侧,先往前完善波谷左侧,再往后寻找到波谷,然后从波谷向后完善波谷右侧数据
  • 异常点位于波谷右侧,先往后完善波谷右侧,再往前寻找到波谷,然后从波谷向前完善波谷左侧数据

yi_chang

我们可以发现,通过这种方法可以找到大部分波谷,但仍有大部分波谷没有被检测出来,也就是会有异常点被遗漏。

# 用深度学习方法寻找活性数据的异常点

# 加载数据集

加载数据集,先把数据转换为时序数据的样式,再创建 DataLoader 加载数据集,划分训练集(10%-20%)、验证集(10%-15%)、测试集(剩余)。

def create_sequences(features, label_length, sequence_length):
    """
    划分数据为输入序列,根据sequence_length条数据往后预测label_length条数据
    :param features: 要处理的数据(list)
    :param label_length: 往后预测的长度(int)
    :param sequence_length: 依据数据的长度(int)
    :return: 所有前面数据(list), 所有预测的数据(list)
    """
    feature_sequences = []
    label_sequences = []
    for i in range(len(features) - sequence_length - label_length + 1):
        feature_sequences.append(features[i:i + sequence_length].reshape(-1, 1))
        label_sequences.append(features[i + sequence_length:i + sequence_length + label_length])
    
    feature_sequences = torch.Tensor(np.array(feature_sequences))
    label_sequences = torch.Tensor(np.array(label_sequences))
    return feature_sequences, label_sequences
# 举个例子
X_train, y_train = create_sequences(train_data, label_length, seq_length)
# 创建 DataLoader
train_data = TensorDataset(X_train, y_train)
train_loader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

# 构建、训练模型

构建、训练模型。这里尝试了以下几种模型,并通过调参对比了影响不同模型的效果,点击对应模型查看具体实验效果与对应结论。

# 预测、寻找异常点

我们选择一个较好的模型(整体来看 TCN 效果不错),来先预测数据趋势,跟真实数据进行比较,若超过阈值,则判定为异常点。

这里的阈值,有两种判定标准:

  1. 统计所有测试集的预测值和真实值的差值,取其均值 + 1.5 倍标准差作为阈值。
  2. 统计所有验证集的预测值和真实值的差值,取其最大值作为阈值(前提条件:验证集中没有波动很大的异常数据)。
def find_threshold(actual, predictions, option="diff"):
    differences = np.abs(actual - predictions)
    if option == "diff":
        # 计算差异的平均值和标准差
        mean_diff = np.mean(differences)
        std_diff = np.std(differences)
        threshold = mean_diff + 1.5 * std_diff
    elif option == "max":
        threshold = np.max(differences)
    else:
        raise ValueError("option 输入不合法")
    return threshold
def detect_anomalies(actual, predictions, threshold):
    """
    Detect anomalies in test data using thresholding.
    """
    anomalies_list = []
    for i, predict in enumerate(predictions):
        current_diff = predict - actual[i]
        # 如果差异超出阈值范围,则标记为异常
        if current_diff > threshold:
            # anomalies_series[i] = data_list[i]
            anomalies_list.append(i)
    return anomalies_list

对上面两种阈值的选择,对比查看异同,为了统一寻找标准,把色谱数据倒立一下🤸。

根据测试集的均值 + 1.5 标准差得到的异常值如下:

活性数据 1 活性数据 2 色谱数据
iloc1_anomalies iloc3_anomalies iloc5_anomalies

根据验证集最大值得到的异常值如下:

活性数据 1 活性数据 2 色谱数据
iloc1_anomalies iloc3_anomalies iloc5_anomalies

理想情况下第二种的选择是比较合适的,但验证数据并不是很理想,存在部分噪声,导致阈值选择会偏差理想值,不如根据总体情况去选择阈值。可以看到第一种情况找到的异常值更多,也更符合我们的预期。

虽然异常点已经找到,但并不是所有都是我们想要的,如果在相同时刻中都存在异常点,则保留,否则丢弃,简单说一下思路,首先要把上述异常点进行完善成一个完整波谷,以活性数据 1 为标准,获取每一个波谷的谷底时间点,然后遍历活性数据 2 和色谱数据,看活性数据 2 和色谱数据中对应的时间点是否属于异常点,👬哥俩都有则保留,否则丢弃。

筛选前的图像:

all_anomalies

筛选后:

common_anomalies

找出了大部分异常点,但还需要改善。

# 迁移学习

将当前训练好的模型应用到其他实验数据上。

预测效果如下:

活性数据 1 活性数据 2 色谱数据
iloc1_predict iloc3_predict iloc5_predict

效果还可以。

# 波形匹配

类似于时序数据的相似度比较✍️