我试图找出在我的两个数据集上实现联合分区的最佳方法,以消除与连接相关的混乱。我正在处理两个数据帧A和B,其中A包含最少的用户日期,包括与之交互的事件ID字段,B包含有关事件的详细信息。我正在尝试加入3个字段:day
、event_type
和event_id
。A和B需要从磁盘中读取,因为它们将被外部客户端持续写入和读取。
我正在进行的项目的主要目标是能够快速:
- 按
event_type
- 筛选将原始事件详细信息加入用户ID
我知道,为了实现#1,我可能需要在event_type
上分区我的拼花文件,以便目录结构实现更容易的过滤。为了实现#2,我应该尝试通过对两个数据帧的键进行共同分区,尽可能减少混洗。
我使用的数据由3天的事件数据组成(每种事件类型约1200万行),目标是在1-3年的数据中有效地工作。
为了改进我的加入,我首先对event_type
进行过滤,以缩小两个数据帧上的数据范围。然后我在day
和event_id
上执行实际的连接。这自然会导致混乱,因为没有协同分区,所以我尝试使用哈希分区来解决这个问题。
我读到repartition
在指定的列上实现了哈希分区。我将数据帧保存到磁盘上,还包括一个partitionBy('day', 'event_type')
,以便在过滤/分组操作上获得更好的性能。
A\ .repartition('day', 'event_id')\ .write .partitionBy('day', 'event_type')\ .mode('overwrite')\ .parquet('/path/to/A') B\ .repartition('day', 'event_id')\ .write\ .partitionBy('day', 'event_type')\ .mode('overwrite')\ .parquet('/path/to/B') ... ... A = spark.read.parquet('/path/to/A') B = spark.read.parquet('/path/to/B') A.filter(col('event_type') == 'X')\ .join(B.filter(col('event_type) == 'X'), on=['day', event_id'], how='inner')\ .show()
当我执行此操作时,我仍然会在计划中看到洗牌交换,以及洗牌写入,每次大约占用5-10GB。我还看到,执行者计算时间更长,大约为21-41秒,这在3天的数据中可能看起来不多,但在年度数据中可能会爆炸。
我想知道我能做这件事的更好方法是什么,或者在处理数据帧时是否有可能消除混叠?这个问题的答案似乎表明,这可能是可能的,但不是一个好主意?
我甚至不确定同时使用repartition
和partitionBy
是正确的方法。当我从磁盘重新读取拼花地板文件时,使用repartition()
的初始分区是否完全保留?我已经读到,情况可能并非如此——总的来说,可用的信息要么相互冲突,要么没有明确的来源。
谢谢你抽出时间来帮忙。