概述 - 目标:以Delta Lake实现事务写入、幂等合并与版本化时光回溯,保障批流一致与故障恢复。 - 适用:事件事实表、实时更新、回滚修复、审计与重放场景。 核心与实战 - 写入与更新(PySpark): ``` from pyspark.sql import SparkSession spark = SparkSession.builder.appName("delta").getOrCreate() df = spark.createDataFrame([ (1, "BUY", 10.5), (2, "BUY", 20.0) ], ["id", "type", "amount"]) df.write.format("delta").mode("append").save("/delta/events") upserts = spark.createDataFrame([ (1, "BUY", 11.0), (3, "REFUND", -5.0) ], ["id", "type", "amount"]) from delta.tables import DeltaTable dt = DeltaTable.forPath(spark, "/delta/events") dt.alias("t").merge(upserts.alias("s"), "t.id = s.id") \ .whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute() ``` - SQL时光回溯: ``` -- 读取历史版本 SELECT * FROM delta."/delta/events" VERSION AS OF 0; -- 或按时间点读取 SELECT * FROM delta."/delta/events" TIMESTAMP AS OF "2025-11-25T12:00:00Z"; ``` 示例 - VACUUM与保留: ``` -- 移除过期文件(默认7天),谨慎使用 VACUUM delta."/delta/events" RETAIN 168 HOURS; ``` - 事务日志检查: ``` DESCRIBE HISTORY delta."/delta/events"; ``` 验证与监控 - 一致性与幂等: - 验证`MERGE`结果与幂等性;在重复写入时保持最终值正确。 - 版本与回滚: - 使用`VERSION AS OF`读取历史并与当前对比,支持回滚修复。 - 资源与存储: - 观察小文件与合并任务;调优批量与`autoOptimize`选项(如Databricks设置)。 常见误区 - 过早`VACUUM`导致时光回溯不可用;需在保留期内避免删除必要文件。 - 未使用`MERGE`而用`overwrite`破坏历史;推荐幂等合并。 - 小文件过多影响性能;需批量写入与合并优化。 结语 - Delta Lake通过ACID与时光回溯提供强一致与可审计能力,适合批流一体与生产级数据治理。

发表评论 取消回复