本文针对 spark 在本地模式下读取 CSV 文件并写入 Iceberg 表时,读取阶段能够并行执行,而写入阶段却只能单核运行的问题,提供了详细的解决方案。通过调整 Spark 配置,例如禁用动态资源分配、显式设置 executor 数量和资源,以及优化 AWS CLI 配置,可以有效提升写入性能,实现并行写入。
在使用 Spark 处理大规模数据时,我们经常会遇到读取速度快,但写入速度慢的问题。尤其是在本地模式下,虽然 Spark 能够利用多核并行读取数据,但在写入分区时,却可能退化为单核运行,导致整体性能瓶颈。本文将针对这一问题,提供详细的解决方案和优化建议。
理解问题根源
在本地模式下,Spark 默认只有一个 Executor,即本地 jvm 实例。虽然读取操作可以利用多线程并行执行,但写入操作往往受限于单个 Executor 的资源限制。动态资源分配(spark.dynamicAllocation.enabled)在这种情况下可能不会起到预期效果,甚至可能将所有任务分配给单个 Executor。
解决方案:显式配置 Executor 资源
避免依赖动态资源分配,而是显式地配置 Executor 的数量和资源,可以有效解决单核写入的问题。
-
禁用动态资源分配:
移除 spark.dynamicAllocation.enabled = true 的配置。
-
显式设置 Executor 数量和资源:
在提交 Spark 应用时,通过命令行参数设置 Executor 的数量、内存和 CPU 核心数。例如:
spark-submit --master yarn --deploy-mode cluster --num-executors 4 --executor-memory 1G --executor-cores 1 --driver-memory 2G --driver-cores 1 ...
- –num-executors: 设置 Executor 的数量。
- –executor-memory: 设置每个 Executor 的内存大小。
- –executor-cores: 设置每个 Executor 的 CPU 核心数。
注意:需要根据集群的资源情况和数据规模,合理调整这些参数。可以通过 Yarn Resource Manager ui 监控 CPU 核心的利用率,进行微调。
-
监控 Spark history Server UI:
在写入操作开始时,通过 Spark History Server UI 检查 Executor 的数量和 Task 的数量,确保任务能够均匀分配到各个 Executor 上。
优化 AWS CLI 配置
如果数据写入目标是 AWS S3,还可以通过优化 AWS CLI 的配置,提升写入性能。
-
增加并行度:
通过设置 max_concurrent_requests 和 max_queue_size 参数,增加并发请求的数量和任务队列的大小。
[default] s3 = max_concurrent_requests = 20 max_queue_size = 1000
-
调整分片上传参数:
通过设置 multipart_threshold 和 multipart_chunksize 参数,优化分片上传的策略。
[default] s3 = multipart_threshold = 64MB multipart_chunksize = 16MB
-
限制带宽(可选):
如果需要限制上传带宽,可以通过设置 max_bandwidth 参数实现。
[default] s3 = max_bandwidth = 100MB/s
注意:这些参数需要在 AWS CLI 的配置文件中进行设置,通常位于 ~/.aws/config。
总结
通过显式配置 Executor 资源和优化 AWS CLI 配置,可以有效解决 Spark 在本地模式下写入分区时仅使用单核的问题,显著提升写入性能。在实际应用中,需要根据具体的硬件环境和数据规模,进行参数调优,以达到最佳效果。此外,监控 Spark History Server UI 和 Yarn Resource Manager UI,可以帮助我们更好地了解任务的执行情况,并及时发现潜在的问题。