Python数据清洗流水线分为读取、校验、转换、输出四阶段:结构化读取并提取元数据;字段级校验生成问题报告;按YAML配置动态执行转换;原子化输出+变更留痕+日志追踪,强调元数据传递、错误隔离与配置解耦。
Python可以通过分阶段、可复用的函数组合,配合配置驱动和日志追踪,构建稳定可控的自动化数据清洗流水线。核心不是写一个大脚本,而是把清洗逻辑拆成“读取→校验→转换→输出”四个可独立测试、按需启用的阶段。
避免直接用 pandas.read_csv() 硬编码路径。改用统一入口函数,自动识别文件类型、编码、分隔符,并提取表名、时间戳、版本等元数据,存入上下文字典(如 ctx = {"source": "sales_202505.csv", "dt": "2025-05-01", "schema": {...}})。
pathlib.Path.suffix 判断不直接删数据,而是生成清洗报告(DataFrame 或 dict),记录每列的空值率、异常值数量、格式错误位置、业务规则冲突行索引。例如:
r"^1[3-9]\d{9}$" 校验,标出不匹配的行号pd.to_datetime(..., errors="coerce") 转换后检查 NaT 比例结果存为 ctx["report"],后续阶段可据此决定是否中断流程或启用修复策略。
把清洗动作参数化:哪些列要标准化(如大小写、去空格)、哪些缺失值用均值/前向填充/固定值补、哪些枚举字段映射别名。配置存在 YAML 文件中,例如:
columns:
user_name: {strip: true, upper: false}
status: {map: {"A": "active", "I": "inactive"}}
amount: {fillna: "median"}
清洗函数根据该配置动态调用对应方法,不写死逻辑,方便 QA 修改规则而不动代码。
清洗后数据不直接覆盖原文件。而是:
— 输出清洗后 CSV/Parquet 到 output/cleaned/,带时间戳命名
— 同步生成 diff/ 目录,存每列的变更统计(如 “phone: 12 行格式修正,3 行设为空”)
— 记录完整执行日志(含输入哈希、各阶段耗时、报告摘要),便于回溯和监控
shutil.copy2() 保留原始文件修改时间try/except + logging.error,失败时仍输出当前中间结果供排查
基本上就这些。流程本身不复杂,但容易忽略元数据传递、错误隔离和配置解耦——这三点才是让清洗流水线真正可维护的关键。