Airflow 中用 PythonOperator 处理 XML 的核心是封装可序列化、无副作用、带异常处理的 Python 函数,使用 ElementTree 或 lxml 解析,通过 op_kwargs 传参,利用 XCom 传递结果,并注意环境依赖与路径可见性。
用 Airflow 的 PythonOperator 处理 XML 文件,核心是把解析、转换或校验 XML 的逻辑封装成一个 Python 函数,再交给 operator 执行。关键在于函数要可序列化、无副作用、能处理路径和异常。
这个函数应接收必要的参数(如文件路径、目标字段),使用标准库 xml.etree.ElementTree 或第三方库(如 lxml)解析,返回结构化结果(字典、列表等),便于下游任务使用。
ElementTree(无需额外安装),对简单 XML 足够;若需 XPath 2.0、命名空间或大文件流式处理,选 lxml
**context 获取 execution_date 或 dag_run.conf 动态拼接文件路径ParseError、FileNotFoundError 等异常,并用 logging 记录,否则任务会静默失败将 XML 处理函数作为 python_callable 传入,用 op_kwargs 传递参数(如 input_path、required_tags),避免闭包或 lambda —— 它们无法被 Airflow 序列化。
awscli 或 boto3 下载到本地临时路径,处理完再清理XCom 提取特定键:{{ ti.xcom_pull(task_ids='parse_xml')['ids'] }}
do_xcom_push=True(默认开启),确保返回值能被下游读取不同业务需求对应不同处理模式,函数内部逻辑需适配:
root.iter('item'),用 findtext() 取文本,get() 取属性,组装为
字典列表assert 或自定义异常抛出,触发任务失败json.dumps() 或 pandas.DataFrame().to_csv() 写入指定路径,供后续任务读取Airflow worker 的 Python 环境必须安装所需 XML 库(如 lxml),且文件路径需对 worker 可见 —— 本地路径只适用于 LocalExecutor;KubernetesExecutor 或 CeleryExecutor 需挂载共享存储(如 NFS、S3FS)或预下载。
iterparse() 流式解析,避免内存溢出