从 DAG 到声明式:一场范式转变
数据工程师最熟悉的工作模式是什么?写一个 DAG。定义任务 A、任务 B、任务 C,然后画出它们之间的依赖关系。Airflow 是这样,Dagster 是这样,大多数编排工具都是这样。
但问题来了:你真正关心的是数据本身,还是执行 DAG?
Apache Spark 4.1 给出了一个新答案:Spark Declarative Pipelines(SDP)。你只需要声明"我要什么表、表的内容怎么来",剩下的——依赖推断、执行顺序、并行化、错误处理、增量更新——全部交给框架。
这不是一个小功能。这是 Spark 生态对数据管道开发方式的根本性重新思考。
三分钟快速体验
安装只需一行:
pip install pyspark[pipelines]
写一个最简单的管道:
from pyspark import pipelines as dp
@dp.materialized_view
def daily_sales():
return spark.table("orders").groupBy("date").agg({"amount": "sum"})
运行:
spark-pipelines run
没有 saveAsTable()。没有 start()。没有 awaitTermination()。你只是描述了"我想要一个按日期汇总的销售表",SDP 负责让它存在。
核心概念
Flow:数据流动的最小单元
Flow 是 SDP 的基本构建块。每个 Flow 描述了一个完整的数据流动过程:从哪里读、怎么转换、写到哪里。
SDP 有两种 Flow 语义:
- Streaming Flow → 输出到 Streaming Table(增量处理)
- Batch Flow → 输出到 Materialized View 或 Temporary View
Dataset:你真正关心的东西
Dataset 是 Flow 的输出,也是管道中可查询的对象。SDP 提供三种 Dataset 类型:
Streaming Table —— 持续增量更新的表,适合从 Kafka 等消息系统摄入数据:
@dp.table
def raw_events():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
)
Materialized View —— 预计算的批处理表,完整刷新:
@dp.materialized_view
def hourly_metrics():
return (
spark.table("raw_events")
.groupBy(window("timestamp", "1 hour"))
.agg(count("*").alias("event_count"))
)
Temporary View —— 管道执行期间的中间结果,不持久化,但让依赖图更清晰:
@dp.temporary_view
def cleaned_events():
return spark.table("raw_events").filter("event_type IS NOT NULL")
依赖自动推断
这是 SDP 最优雅的设计之一。你不需要显式声明 “hourly_metrics 依赖 raw_events”——SDP 分析你的查询逻辑,发现 spark.table("raw_events") 的调用,自动构建依赖图。
raw_events (Streaming Table)
↓
cleaned_events (Temporary View)
↓
hourly_metrics (Materialized View)
SQL 原生支持
SDP 不仅支持 Python,还原生支持 SQL。同样的管道可以这样写:
CREATE STREAMING TABLE raw_events
AS SELECT * FROM STREAM kafka_source;
CREATE TEMPORARY VIEW cleaned_events
AS SELECT * FROM raw_events WHERE event_type IS NOT NULL;
CREATE MATERIALIZED VIEW hourly_metrics
AS SELECT window(timestamp, '1 hour'), count(*) AS event_count
FROM cleaned_events
GROUP BY 1;
对于以 SQL 为主的团队,这意味着零学习成本。
批流混合:一个图搞定
传统做法中,批处理和流处理是两套独立的管道。SDP 允许你在同一个依赖图中混合使用:
# 流式摄入
@dp.table
def orders():
return spark.readStream.format("kafka")...
# 批处理聚合(读取上面的流表)
@dp.materialized_view
def daily_summary():
return spark.table("orders").groupBy("date").count()
SDP 自动管理触发器、调度和检查点,你不需要关心底层的 Structured Streaming 机制。
多 Flow 写入同一目标
一个常见场景:你有多个数据源需要写入同一张表。SDP 通过 append_flow 优雅地解决:
dp.create_streaming_table("all_orders")
@dp.append_flow(target="all_orders")
def us_orders():
return spark.readStream.table("orders_us")
@dp.append_flow(target="all_orders")
def eu_orders():
return spark.readStream.table("orders_eu")
工程化:项目结构和 CLI
SDP 提供了完整的项目结构和命令行工具:
# 初始化项目
spark-pipelines init --name my_pipeline
# 验证管道(不读写数据)
spark-pipelines dry-run
# 执行管道
spark-pipelines run
项目通过 spark-pipeline.yml 配置:
name: my_pipeline
libraries:
- glob:
include: transformations/**
catalog: my_catalog
database: my_db
configuration:
spark.sql.shuffle.partitions: "1000"
dry-run 特别值得一提——它能在不读写任何数据的情况下捕获语法错误、分析错误和循环依赖,这对 CI/CD 集成非常友好。
与编排工具的关系
SDP 不是要替代 Airflow 或 Dagster。它专注于 Spark 层面的数据转换和依赖管理。在实际生产中,一个典型的架构是:
Airflow/Dagster(顶层编排)
├── 触发 SDP 管道(数据转换)
├── 调用外部 API
├── 发送通知
└── 其他非 Spark 任务
SDP 处理数据转换的重活,编排工具处理端到端的工作流。
我的看法
作为 Spark PMC 成员,我认为 SDP 解决了几个长期存在的痛点:
降低入门门槛。新手不需要理解 Structured Streaming 的 checkpoint、trigger、outputMode 等概念就能写出可靠的流式管道。
减少样板代码。不再需要
writeStream.format().option().start().awaitTermination()这样的仪式性代码。统一批流。同一套声明式 API,同一个依赖图,batch 和 streaming 不再是两个世界。
AI 友好。声明式的 Flow 本质上是函数,可以被测试、被调用、被 AI 编程助手理解和生成。这对 AI 辅助数据工程意义重大。
SDP 的设计思路源自 Databricks 在生产环境中验证过的 Delta Live Tables(DLT)模式,现在被带入了开源 Spark。这意味着整个社区都能受益于这些经过大规模验证的最佳实践。
上手试试
pip install pyspark[pipelines]
spark-pipelines init --name hello_sdp
cd hello_sdp
spark-pipelines run
更多内容请参阅官方编程指南:Spark Declarative Pipelines Programming Guide
Spark Declarative Pipelines 在 Apache Spark 4.1 中引入,相关设计文档见 SPARK-51727。