Spark 导航
临时文件、中间文件、缓存数据,都会存储到 spark.local.dir
中
spark.local.dir
配置到 SDD 或访问高效的存储系统
磁盘复用 :
一旦某个计算环节出错,就会触发失败重试。失败重试的触发点是距离最新的 Shuffle 的中间文件
当 RDD4 的计算任务失败时,会从 RDD4 向前回溯,回溯到 RDD3 (RDD2 输出的中间文件 ) ,并重新开始计算
ReuseExchange 是 Spark SQL 优化一种 : 相同或相似的物理计划能共享 Shuffle 中间文件
ReuseExchange 机制的触发条件:
统计不同用户的 PV(Page Views,页面浏览量)、UV(Unique Views,网站独立访客),并把两项统计结果合并:
//版本1:分别计算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value"))val resultDF: DataFrame = dfPV.Union(dfUV)
// Result样例
| userId | metrics | value |
| user0 | PV | 25 |
| user0 | UV | 12 |
文件扫描/Shuffle 两次 :
以 userId 为分区 ,调用 repartition :
//版本2:分别计算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath).repartition($"userId")val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value"))val resultDF: DataFrame = dfPV.Union(dfUV)
// Result样例
| userId | metrics | value |
| user0 | PV | 25 |
| user0 | UV | 12 |
ReuseExchange :