别再死记硬背了!用Spark电影评分分析实战,手把手教你Join操作与数据清洗(附完整Scala代码)
用Spark实战电影评分分析从数据清洗到Join操作的完整指南电影评分数据中隐藏着无数观众偏好的秘密。想象一下你手头有两份数据一份记录了数百万用户对电影的评分另一份包含了电影的基本信息。如何从中挖掘出平均分高于4.0的热门电影这正是Spark大显身手的舞台。本文将带你从零开始用Scala编写一个完整的Spark分析流程重点解析Join操作的核心技巧与性能优化。1. 环境准备与数据理解在开始编码前我们需要准备好Spark环境和理解数据集结构。假设你已安装Spark 3.x和Scala 2.12以下是项目依赖的sbt配置libraryDependencies Seq( org.apache.spark %% spark-core % 3.3.0, org.apache.spark %% spark-sql % 3.3.0 )1.1 数据集结构解析我们使用的两个核心数据集评分数据(ratings.dat)格式用户ID::电影ID::评分::时间戳示例1::122::5::838985046电影数据(movies.dat)格式电影ID::电影名称::类型示例122::The Dark Knight (2008)::Action|Crime|Drama提示实际项目中建议先用小样本数据测试代码逻辑再扩展到全量数据。2. 数据加载与初步清洗2.1 创建SparkSession任何Spark应用的起点都是创建SparkSessionimport org.apache.spark.sql.SparkSession val spark SparkSession.builder() .appName(MovieRatingAnalysis) .master(local[*]) // 生产环境替换为集群地址 .getOrCreate() import spark.implicits._2.2 加载原始数据使用Spark的DataFrame API加载数据比RDD更简洁val ratingsDF spark.read .option(sep, ::) .csv(data/ratings.dat) .toDF(userId, movieId, rating, timestamp) val moviesDF spark.read .option(sep, ::) .csv(data/movies.dat) .toDF(movieId, title, genres)2.3 数据类型转换原始数据都是字符串类型需要转换为适当类型val cleanRatings ratingsDF.select( $userId.cast(integer), $movieId.cast(integer), $rating.cast(double), $timestamp.cast(long) ) val cleanMovies moviesDF.select( $movieId.cast(integer), $title, $genres )3. 核心分析计算电影平均分3.1 分组聚合计算计算每部电影的平均评分是分析的关键步骤val avgRatings cleanRatings .groupBy(movieId) .agg(avg(rating).alias(avgRating)) .filter($avgRating 4.0) // 筛选高评分电影3.2 性能优化技巧当数据量很大时groupBy操作可能成为性能瓶颈。以下是两种优化方案预分区在groupBy前按movieId重新分区cleanRatings.repartition($movieId).groupBy(movieId)...使用reduceByKeyRDD APIval ratingsRDD cleanRatings.rdd.map(r (r.getInt(1), (r.getDouble(2), 1)) ) val avgRatingsRDD ratingsRDD.reduceByKey((a, b) (a._1 b._1, a._2 b._2) ).mapValues{ case (sum, count) sum / count }4. Join操作实战与优化4.1 基本Join实现将平均分与电影信息关联val popularMovies avgRatings.join( cleanMovies, avgRatings(movieId) cleanMovies(movieId), inner ).select(title, avgRating)4.2 Join性能陷阱与解决方案Spark Join操作常见性能问题及对策问题类型表现解决方案数据倾斜少数task执行时间极长使用salting技术或广播小表笛卡尔积数据量爆炸式增长严格检查join条件网络IO高shuffle数据量大合理设置分区数广播Join示例当电影表较小时import org.apache.spark.sql.functions.broadcast val popularMovies avgRatings.join( broadcast(cleanMovies), avgRatings(movieId) cleanMovies(movieId) )5. 结果展示与存储5.1 格式化输出对结果进行排序和格式化val finalResult popularMovies .orderBy($avgRating.desc) .select( $title, format_number($avgRating, 1).alias(rating) )5.2 多种存储方式根据需求选择存储格式CSV格式finalResult.write .option(header, true) .csv(output/popular_movies)Parquet格式列式存储适合后续分析finalResult.write.parquet(output/popular_movies_parquet)直接打印开发调试用finalResult.show(10, truncate false)6. 生产环境注意事项在实际项目中部署此类分析作业时有几个关键点需要特别注意资源分配根据数据量调整executor内存和核心数失败处理设置合理的重试机制和checkpoint监控通过Spark UI跟踪任务进度和资源使用一个典型的spark-submit命令示例spark-submit \ --class com.example.MovieAnalysis \ --master yarn \ --executor-memory 8G \ --num-executors 10 \ your-application.jar \ input/ratings.dat \ input/movies.dat \ output/results7. 扩展思路掌握了基础分析后可以尝试以下进阶分析时间维度分析按周/月分析评分趋势用户分群识别高价值用户的评分偏好类型关联分析不同类型电影的平均分差异推荐系统基于协同过滤的简单推荐实现以下是计算各类型电影平均分的示例val genresRatings cleanMovies .withColumn(genre, explode(split($genres, \\|))) .join(cleanRatings, movieId) .groupBy(genre) .agg(avg(rating).alias(avgRating)) .orderBy($avgRating.desc)