本教程探讨pyspark在本地模式下读取大量小型parquet文件时遇到的性能瓶颈。核心问题在于“小文件问题”导致的任务调度和i/o开销。文章将解释spark的懒加载机制为何在此场景下表现异常,并提供通过文件合并(repartition)来优化数据存储结构,从而显著提升读取效率的专业解决方案。
在使用PySpark处理数据时,开发者常期望其具备高效的分布式处理能力。然而,当面临大量(例如1300个)、但每个文件体积较小(例如8MB)的Parquet文件集合时,即使在本地模式下,也可能遇到令人意外的加载速度缓慢问题。本节将详细描述这种现象及其背后的机制。
考虑以下PySpark代码片段,它尝试读取一个由分区Parquet文件组成的目录:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType # 示例schema类型
# 初始化SparkSession
conf = pyspark.SparkConf().set('spark.driver.memory', '3g')
spark = (
SparkSession.builder
.master("local[10]") # 在本地模式下使用10个线程
.config(conf=conf)
.appName("Spark Local")
.getOrCreate()
)
# 示例:假设已知Schema,或者从单个文件推断
# 实际场景中,如果所有文件Schema一致,可提前定义或从一个文件推断
# 例如:
# schema = StructType([
# StructField("column1", StringType(), True),
# StructField("column2", IntegerType(), True)
# ])
# 或者像问题中那样从一个文件推断:
df_sample = spark.read.parquet(r"C:\Project Data\Data-0.parquet")
schema = df_sample.schema
print("Schema successfully inferred from sample file.")
df_sample.printSchema()
# 尝试读取所有文件
# 假设文件路径模式为 "C:\Project Data\Data-*.parquet"
print("Attempting to read all partitioned parquet files using specified schema...")
df = spark.read.format("parquet") \
.schema(schema) \
.load(r"C:\Project Data\Data-*.parquet")
# 此时,即使没有立即触发Action,用户也可能观察到长时间的等待和内存消耗增加
# 例如,尝试执行一个Action:
# print(f"Total records: {df.count()}") # 这将触发实际计算
# df.show(5) # 或者显示前几行在执行 spark.read.load() 这一行时,用户可能会观察到程序长时间无响应,并且系统内存占用缓慢增长,这与Spark的“懒加载”(lazy evaluation)特性似乎相悖。通常认为,Spark仅在遇到Action操作时才会真正执行计算,而读取操作本身应该很快完成,仅加载元数据。
Spark的懒加载机制意味着转换(Transformation)操作(如map, filter, read)不会立即执行,而是构建一个逻辑执行计划。只有当遇到行动(Action)操作(如count, show, write)时,Spark才会根据执行计划进行实际计算。
然而,对于spark.read.parquet()这类操作,即使是懒加载,也需要进行一系列的预处理:
在处理大量小文件时,上述第一点尤其耗时。S
park必须对每一个小文件执行文件系统操作和元数据读取,这会产生巨大的I/O和CPU开销,即使每个文件很小。这解释了为什么在执行 load() 操作时,即使没有立即触发Action,也会感觉到显著的延迟和内存增长(可能是Spark驱动程序或执行器内部缓存文件元数据)。
此外,在本地模式下,master("local[10]") 指定了10个线程。但实际的并行度仍然受限于物理CPU核心数。如果机器只有2个物理核心,那么即使指定10个线程,也无法达到真正的10倍并行加速,反而可能因为线程切换的开销而降低效率。
导致上述性能问题的根本原因在于分布式系统中的“小文件问题”(Small File Problem)。
在Hadoop和Spark等分布式计算环境中,数据通常被分割成较大的块(例如HDFS默认块大小为128MB或256MB)进行存储和处理。每个数据块对应一个或多个任务。当处理大量远小于块大小的文件时,会引发一系列效率问题:
解决“小文件问题”最有效的方法是将大量小文件合并成少数几个大文件。这可以通过PySpark的重分区(repartition)和写入操作来实现。
步骤1:读取现有小文件(首次操作可能仍然较慢)
虽然读取小文件集合本身会耗时,但这是进行合并的前提。
# 假设df_raw是您通过上述慢速方式读取的DataFrame
# 这一步仍然会慢,但它将作为一次性的数据加载和转换过程
df_raw = spark.read.format("parquet") \
.schema(schema) \
.load(r"C:\Project Data\Data-*.parquet")
print(f"Successfully loaded initial DataFrame from small files.")
# df_raw.count() # 可以选择在这里触发count来获取总记录数步骤2:重分区并写入为合并的大文件
repartition() 转换操作可以将DataFrame的数据重新分布到指定数量的分区中。然后,通过 write 操作将这些分区写入为新的Parquet文件。
# 确定目标分区数
# 假设原始数据总大小为 1300 * 8MB = 10400MB (约10.4GB)
# 目标文件大小为 128MB/文件,则所需分区数约为 10400MB / 128MB = 81.25
# 我们可以选择一个合适的整数,例如 80 或 100
target_partitions = 80 # 根据总数据量和期望的文件大小进行调整
# 对DataFrame进行重分区,并将结果写入新的Parquet目录
# 这将生成大约 target_partitions 个较大的Parquet文件
print(f"Repartitioning data into {target_partitions} files and writing to new location...")
output_path = r"C:\Project Data\Consolidated_Data" # 新的存储路径
df_raw.repartition(target_partitions) \
.write \
.mode("overwrite")
# node
# app
# 懒加载
# session
# 性能瓶颈
# 内存占用
# 为什么
# red
# 分布式
# count
# Filter
# 线程
# map
# hadoop
# spark
# hdfs
# 性能优化
# 加载
# 文件系统
# 模式下
# 长时间
# 文件合并
# 可以选择
# 这将
# 慢速
# 这是
# 大文件
相关栏目:
【
Google疑问12 】
【
Facebook疑问10 】
【
网络优化76771 】
【
技术知识130152 】
【
IDC云计算60162 】
【
营销推广131313 】
【
AI优化88182 】
【
百度推广37138 】
【
网站推荐60173 】
【
精选阅读31334 】
相关推荐:
如何用正则表达式精确匹配“start”到“end”之间最多含一个换行符的文本段
Go语言中slice追加操作的底层共享机制解析
Win11怎么压缩文件 Win11自带压缩解压功能使用【教程】
Mac如何将HEIC图片格式转为JPG_Mac批量转换图片【指南】
Windows蓝屏错误0x0000002C怎么解决_系统IO异常排查方法
如何在Golang中写入XML文件_生成符合规范的XML数据
Win10怎么关闭自动更新错误重启 Win10策略禁止失败补丁强制重启【防护】
如何使用Golang实现基本类型比较_Golang比较操作符使用方法
php嵌入式需要什么环境_搭建php+linux嵌入式开发环境【详解】
Win10怎么卸载鲁大师_Win10彻底卸载鲁大师方法【步骤】
Go 中 defer 语句在 goroutine 内部不返回时不会执行
php下载安装包太大怎么下载_分卷压缩下载方法【教程】
Win11怎么开启游戏模式_Windows11优化游戏帧数设置指南
Python对象比较排序规则_集合使用说明【指导】
php怎么下载安装并配置环境变量_命令行调用PHP技巧【技巧】
Python字符串操作教程_切片拼接与格式化详解
Win11如何设置鼠标灵敏度_Win11鼠标灵敏度调整教程【攻略】
如何在 Go 中创建包含 map 的 slice(嵌套数据结构)
Python深度学习实战教程_神经网络模型构建与训练
c++中如何进行二进制文件读写_c++ read与write函数用法
如何使用Golang构建简易投票统计功能_Golang投票数据汇总与展示示例
Win11怎么开启远程桌面_Win11系统远程桌面启用开关
PowerShell怎么创建复杂的XML结构
Win10如何卸载WindowsDefender_Win10卸载Defender教程【方法】
Win11开始菜单打不开_修复Windows 11点击开始图标无响应【教程】
如何在 Pandas 中按元素交集合并两列字符串
Python数据抓取合法性_合规说明【指导】
Win10怎么设置开机密码_Windows10账户登录密码设置与取消
Python对象生命周期管理_创建销毁说明【指导】
Python模块的__name__属性如何由导入方式决定?
php8.4如何配置ssl证书_php8.4https访问配置指南【教程】
MAC如何隐藏文件夹及文件_MAC终端命令隐藏与第三方工具加密【教程】
Win10怎样卸载iTunes_Win10卸载iTunes步骤【步骤】
如何高效识别并拦截拼接式恶意域名 spam
Mac怎么查看活动监视器_理解Mac进程和资源占用【指南】
Python函数缓存机制_lru_cache解析【指导】
如何使用Golang进行HTTP服务性能测试_测量吞吐量和延迟
windows系统如何安装cab更新补丁_windows手动安装更新包教程
如何在Golang中使用container/heap实现堆_Golang container/heap最小堆方法
PHP的FastAdmin架构适合二次开发吗_特点分析【介绍】
c# 在ASP.NET Core中管理和取消后台任务
Mac版Final Cut Pro入门_Mac视频剪辑基础操作【教程】
Win11怎么关闭用户账户控制UAC_Windows11更改通知设置等级
c++ atoi和atof函数用法_c++字符数组转数字
Win11怎么关闭自动调节屏幕亮度_Windows11禁用内容自适应亮度控制
Windows10如何更改桌面背景_Win10个性化幻灯片放映设置
Win11任务栏怎么放到顶部_Win11修改任务栏位置方法【详细】
Win10电脑怎么设置休眠快捷键_Windows10电源按钮功能定义
VSC怎样在VSC中调试PHPAPI_接口调试技巧【详解】
c++中如何对数组进行排序_c++数组排序算法汇总
2025-12-08
致胜网络推广营销网专注海外推广十年,是谷歌推广.Facebook广告全球合作伙伴,我们精英化的技术团队为企业提供谷歌海外推广+外贸网站建设+网站维护运营+Google SEO优化+社交营销为您提供一站式海外营销服务。