用Python搞定FEMTO-ST轴承数据集的预处理(附完整代码与避坑指南)
用Python搞定FEMTO-ST轴承数据集的预处理附完整代码与避坑指南轴承健康监测领域的研究者一定对IEEE PHM 2012挑战赛数据集不陌生。这个由FEMTO-ST研究所发布的经典数据集包含了多种工况下的轴承全寿命周期振动与温度数据已成为预测性维护算法开发的黄金标准。但初次接触这个数据集时很多工程师都会被其复杂的文件结构和悬殊的采样频率差异所困扰——振动信号以25.6kHz高速采集而温度数据仅有10Hz的更新频率如何高效地统一处理这些异构数据本文将手把手带你用Python构建完整的数据预处理流水线。与常见的Matlab方案不同我们完全基于Python生态的Pandas、NumPy和Dask等工具实现从原始CSV文件到规整时间序列的转换特别针对以下痛点提供解决方案批量文件处理自动识别并合并分散在多个目录中的CSV文件采样率转换解决振动信号(25.6kHz)与温度数据(10Hz)的频率对齐问题内存优化使用分块处理技术应对大型振动数据集时间戳重建精确还原被截断的采集时间信息1. 数据集架构解析FEMTO-ST数据集包含三种运行工况每种工况下又分为训练集(Learning_set)、测试集(Test_set)和完整寿命集(Full_Test_set)。其目录结构通常如下PHM2012/ ├── Learning_set/ │ ├── Bearing1_1/ │ │ ├── acc_00001.csv │ │ ├── temp_00001.csv │ │ └── ... │ └── ... ├── Full_Test_set/ │ ├── Bearing1_5/ │ │ ├── acc_00001.csv │ │ └── ... └── Test_set/ ├── Bearing1_3/ │ ├── acc_00001.csv │ └── ...关键数据特征需要特别注意数据类型采样频率每次采集点数文件命名规则振动信号25.6kHz2560点/文件acc_xxxxx.csv温度数据10Hz600点/文件temp_xxxxx.csv注意温度传感器实际采样间隔为0.1秒但每分钟才会保存一次数据到CSV文件2. 环境配置与依赖安装推荐使用Python 3.8环境主要依赖库包括pip install pandas numpy dask scipy tqdm matplotlib对于超大规模数据处理建议额外安装pip install dask[complete] # 支持并行计算核心库的作用说明Pandas提供DataFrame结构和高效IO操作Dask实现内存友好的大数据分块处理Scipy用于信号重采样和滤波tqdm显示处理进度条3. 高效数据加载策略原始数据集包含数千个CSV文件直接逐个读取会非常低效。我们设计了一个自动化的批量加载方案from pathlib import Path import pandas as pd def load_bearing_data(root_path, bearing_id): 加载指定轴承的所有数据 bearing_path Path(root_path) / fBearing{bearing_id} # 振动数据加载 acc_files sorted(bearing_path.glob(acc_*.csv)) acc_dfs [pd.read_csv(f, headerNone, names[vertical, horizontal]) for f in tqdm(acc_files, descLoading vibration)] # 温度数据加载 temp_files sorted(bearing_path.glob(temp_*.csv)) temp_df pd.concat([pd.read_csv(f, headerNone, names[temperature]) for f in temp_files]) return { vibration: pd.concat(acc_dfs), temperature: temp_df }这个基础版本存在两个明显问题一次性加载所有振动数据可能导致内存溢出丢失了原始数据的时间信息改进后的内存安全版本import dask.dataframe as dd def safe_load_vibration(files): 分块加载振动数据 ddf dd.read_csv( files, headerNone, names[vertical, horizontal], blocksize256e6 # 每块256MB ) return ddf.compute() # 触发实际计算4. 时间戳重建与对齐原始CSV文件没有显式时间戳需要根据采集规则重建振动信号每10秒采集一次每次采集0.1秒2560个点温度数据每分钟采集600个点每秒10个点振动信号时间戳生成算法import numpy as np def build_vibration_timestamps(n_samples, fs25600): 构建振动信号的时间戳数组 n_chunks n_samples // 2560 timestamps [] for i in range(n_chunks): start_time i * 10 # 每10秒一个采集窗口 chunk_time start_time np.arange(2560)/fs timestamps.extend(chunk_time) return np.array(timestamps)温度数据的时间戳处理更为简单def build_temp_timestamps(n_samples, fs10): 构建温度数据的时间戳 return np.arange(n_samples) / fs5. 采样率统一与特征提取要将两种不同采样率的数据对齐有两种主流方案方案一振动信号降采样from scipy import signal def downsample_vibration(vib_data, orig_fs25600, target_fs100): 将振动信号降采样到目标频率 ratio int(orig_fs / target_fs) resampled signal.resample_poly( vib_data, up1, downratio, axis0 ) return resampled方案二温度信号插值from scipy import interpolate def interpolate_temperature(temp_data, temp_time, target_time): 温度数据线性插值 f interpolate.interp1d( temp_time, temp_data, kindlinear, fill_valueextrapolate ) return f(target_time)实际工程中更推荐同时计算振动信号的时域特征减少数据量def extract_vibration_features(vib_data, window_size2560): 计算振动信号的特征指标 n_windows len(vib_data) // window_size features [] for i in range(n_windows): window vib_data[i*window_size : (i1)*window_size] features.append({ rms: np.sqrt(np.mean(window**2)), kurtosis: pd.Series(window).kurtosis(), peak: np.max(np.abs(window)) }) return pd.DataFrame(features)6. 完整处理流程示例将上述模块组合成端到端的处理流水线def process_bearing_data(root_path, bearing_id): # 1. 数据加载 raw_data load_bearing_data(root_path, bearing_id) # 2. 时间戳重建 vib_time build_vibration_timestamps(len(raw_data[vibration])) temp_time build_temp_timestamps(len(raw_data[temperature])) # 3. 特征提取 vib_features extract_vibration_features(raw_data[vibration]) vib_features[time] np.arange(len(vib_features)) * 10 # 每10秒一个特征 # 4. 数据对齐 common_time np.arange(0, max(vib_features[time].max(), temp_time.max()), 1) aligned_data pd.DataFrame({ time: common_time, temp: interpolate_temperature( raw_data[temperature].values.flatten(), temp_time, common_time ) }) # 合并振动特征 aligned_data aligned_data.merge( vib_features, howleft, left_ontime, right_ontime ) return aligned_data.interpolate() # 填补空缺值7. 性能优化技巧处理大规模轴承数据时这些技巧可以显著提升效率并行处理使用Dask或Joblib并行处理不轴承的数据from joblib import Parallel, delayed def process_all_bearings(root_path, bearing_ids, n_jobs4): return Parallel(n_jobsn_jobs)( delayed(process_bearing_data)(root_path, bid) for bid in bearing_ids )内存映射对于超大型振动数据使用numpy.memmapvib_mmap np.memmap(vibration.bin, dtypefloat32, moder, shape(n_samples, 2))预处理缓存将中间结果保存为Parquet格式aligned_data.to_parquet(fbearing_{bearing_id}.parquet)8. 常见问题与解决方案Q1时间戳出现错位怎么办检查原始数据的采集间隔设置特别是确认振动数据的fs25600参数是否正确验证温度数据是否确实每分钟保存一次Q2内存不足如何处理振动数据采用分块处理策略先统计总文件数和每个文件的行数预分配内存空间分批读取和填充数据Q3如何验证数据加载的正确性绘制初始阶段的信号时序图plt.figure(figsize(12,4)) plt.plot(raw_data[vibration][vertical][:5000]) plt.plot(raw_data[temperature][:50]*1000) # 缩放显示 plt.legend([Vibration, Temperature(x1000)])实际项目中我们发现在Windows系统上直接使用Pandas读取大量小文件时性能会比Linux系统下降30-40%。一个实用的workaround是先用Python内置的csv模块快速扫描文件结构再批量读取内容。