专业的编程技术博客社区

网站首页 > 博客文章 正文

写Parquet的同时提高Spark作业性能300%

baijin 2024-11-08 10:23:18 博客文章 8 ℃ 0 评论


不久前,我正在运行一个Spark ETL,该ETL从AWS S3提取数据进行了一些转换和清理,并将转换后的数据以Parquet格式写回到AWS S3。 JSON Gzip格式的数据量约为350GB。 我的工作节点总共有48个核心,内存为280 GB。

运行该作业后,我注意到该作业持续进行了超过24小时,并且之间有多次失败。 该错误消息曾经是—执行器在120000ms之后超时。 这项工作正在处理大量数据,并且数据写入阶段停留在某个地方。

看到工作日志后,我看到工作人员被卡在乱堆的数据中,大量数据被溢出到磁盘上。

2018–06–27 12:21:42,671 INFO [UnsafeExternalSorter] — Thread 168 spilling sort data of 3.1 GB to disk (0 time so far)

寻找根本原因

我取消了工作,但在决定后决定检查正在生成的查询计划。

dataframe.explain()

这是为这项工作制定的计划。

== Physical Plan ==
Project [acquire_campaign#60036, … 274 more fields]
+- SortMergeJoin [custom__city#60100], [area_name#33572], LeftOuter
:- Sort [custom__city#60100 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(custom__city#60100, 200)
: +- Project [acquire_campaign#0 AS acquire_campaign#60036, … 274 more fields]
: +- FileScan json [acquire_campaign#0,… 18 more fields] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3a://xxx…, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<acquire_campaign:string,acquire_source:string,advertising_id:string,android_id:string,app:…
+- *Sort [area_name#33572 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(area_name#33572, 200)
+- InMemoryTableScan [final_city#33571, area_name#33572]
+- InMemoryRelation [final_city#33571, area_name#33572], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan RedshiftRelation((select final_city, lower(trim(city_name)) area_name …)) [final_city#33571,area_name#33572] ReadSchema: struct<final_city:string,area_name:string>

那里!! 我终于看到光了。

问题似乎是正在发生的联接Join。 我的原始数据与从Redshift中拉出的一个小表连接在一起,看起来好像都是在对数据进行改组(Exchange哈希分区),最后发生了SortMergeJoin,这非常昂贵,因为排序后的数据在内存中,从而很快填充了内存。 磁盘溢出。 因此前进的道路很明确。 我必须消除导致磁盘溢出的混洗。

解决方案

在阅读了有关联接如何工作以及Catalyst优化器如何优化查询的一些知识之后,我意识到redshift表非常小。 我决定在加入时使用广播提示。 这是使用广播提示后的查询计划-

== Physical Plan ==
Project [acquire_campaign#26550 … 274 more fields]
+- BroadcastHashJoin [custom__city#26614], [area_name#86], LeftOuter, BuildRight
:- Project [acquire_campaign#0 AS acquire_campaign#26550, … 274 more fields]
: +- FileScan json [acquire_campaign#0,… 18 more fields] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3a://xxx…, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<acquire_campaign:string,app:…
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
+- InMemoryTableScan [final_city#85, area_name#86]
+- InMemoryRelation [final_city#85, area_name#86], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan RedshiftRelation((select final_city, lower(trim(city_name)) area_name …)) [final_city#85,area_name#86] ReadSchema: struct<final_city:string,area_name:string>

看哪 哈希交换完成后,用一个BroadcastExchange替换,如果表的大小很小,则该替换并不重要。 最终,该连接以BroadcastHashJoin的形式进行。 这比SortMergeJoin轻量得多。

最终,在将内核数增加到200之后,我运行了该作业,并看到该作业在2.3小时内完成,磁盘溢出最少,而以前则需要24个小时以上。 当考虑到增加的资源时,将近300%的改善。

尤里卡! 甜美成功的气味令人陶醉。 :D

有关更多帖子,请关注我的出版物:-https://medium.com/polar-tropics

(本文翻译自Shitij Goyal的文章《Improving Spark job performance while writing Parquet by 300%》,参考:https://medium.com/polar-tropics/improving-spark-job-performance-while-writing-parquet-by-300-40ccf487a6a5)

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表