我试图找出在我的两个数据集上实现联合分区的最佳方法,以消除与连接相关的混乱。我正在处理两个数据帧A和B,其中A包含最少的用户日期,包括与之交互的事件ID字段,B包含有关事件的详细信息。我正在尝试加入3个字段:dayevent_typeevent_id。A和B需要从磁盘中读取,因为它们将被外部客户端持续写入和读取。

我正在进行的项目的主要目标是能够快速:

  1. event_type
  2. 筛选将原始事件详细信息加入用户ID

我知道,为了实现#1,我可能需要在event_type上分区我的拼花文件,以便目录结构实现更容易的过滤。为了实现#2,我应该尝试通过对两个数据帧的键进行共同分区,尽可能减少混洗。

我使用的数据由3天的事件数据组成(每种事件类型约1200万行),目标是在1-3年的数据中有效地工作。

为了改进我的加入,我首先对event_type进行过滤,以缩小两个数据帧上的数据范围。然后我在dayevent_id上执行实际的连接。这自然会导致混乱,因为没有协同分区,所以我尝试使用哈希分区来解决这个问题。

我读到repartition在指定的列上实现了哈希分区。我将数据帧保存到磁盘上,还包括一个partitionBy('day', 'event_type'),以便在过滤/分组操作上获得更好的性能。

A\
  .repartition('day', 'event_id')\
  .write
  .partitionBy('day', 'event_type')\
  .mode('overwrite')\
  .parquet('/path/to/A')

B\
  .repartition('day', 'event_id')\
  .write\
  .partitionBy('day', 'event_type')\
  .mode('overwrite')\
  .parquet('/path/to/B')
...
...
A = spark.read.parquet('/path/to/A')
B = spark.read.parquet('/path/to/B')
A.filter(col('event_type') == 'X')\
  .join(B.filter(col('event_type) == 'X'), on=['day', event_id'], how='inner')\
  .show()

当我执行此操作时,我仍然会在计划中看到洗牌交换,以及洗牌写入,每次大约占用5-10GB。我还看到,执行者计算时间更长,大约为21-41秒,这在3天的数据中可能看起来不多,但在年度数据中可能会爆炸。

我想知道我能做这件事的更好方法是什么,或者在处理数据帧时是否有可能消除混叠?这个问题的答案似乎表明,这可能是可能的,但不是一个好主意?

我甚至不确定同时使用repartitionpartitionBy是正确的方法。当我从磁盘重新读取拼花地板文件时,使用repartition()的初始分区是否完全保留?我已经读到,情况可能并非如此——总的来说,可用的信息要么相互冲突,要么没有明确的来源。

谢谢你抽出时间来帮忙。

HashPartitioning数据帧以在PySpark中的连接期间实现联合分区的更多相关文章

  1. pyspark自定义UDAF函数调用报错问题解决

    这篇文章主要为大家介绍了pyspark自定义UDAF函数调用报错问题解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  2. CDH(Cloudera分布式Hadoop)到CDP(Cloudera数据平台)迁移Spark 1x-3x查询

    我们目前正在进行从CDH迁移到CDPwrtspark的可行性研究。将Spark1.x作业转换为Spark2.4.5。如果是,那么1x-2x的重构是自动的,或者应该按照cloudera中给出的步骤手动完成https://docs.cloudera.com/cdp-private-cloud-upgrade/latest/upgrade-cdh/topics/cdp-one-workload-migration-spark16-to-spark24.html如果没有,我们可以在从CDH迁移到CDP时直接从sp

  3. 尝试在同一pyspark结构化流作业中写入和读取增量表。可以'看不到数据

    PySpark作业是否可以在增量表中写入,并在同一代码中从该表中读取?使用以下命令运行代码时不会出错。我正在尝试将我要刷新到Kafka的数据可视化到Delta表中,以确保数据流动良好,底层组件也工作良好。即使向我的主题发送了流量,我也可以看到一个空表。任何形式的帮助都会有帮助。

  4. regexp_extract返回不期望的结果

    测试代码:返回:正如你从上面看到的,标题中的2个给出了错误的结果。1013BriarLane是正确的,因为它没有返回任何内容,因为1013不是年份这是我的代码:在标题子字符串中获取正确的年份

  5. 如何将连续列(0-1)分组为相等大小?斯卡拉火花

    我有一个数据帧列,希望将其拆分为大小相等的桶。此列中的值在0-1之间浮动。大多数数据是倾斜的,因此大多数值都在0.90和1之间。铲斗10:所有1铲斗2-9:任何值>;0和<;1铲斗1:所有0例子:continous_number_colBucket0.00120.95711001这应该是当我groupBy时的样子。

  6. Delta Live表完全刷新时不允许架构更改

    我有一个简单的DeltaLiveTables管道,它将多个csv文件从cloudFiles流式读取到发布到蜂巢元存储的Delta表中。我有两个要求使我的情况更加复杂/独特:由于csv文件的格式,我需要将skipRows参数用于autoLoader。这需要使用Databricks运行时的预览通道。source我需要将表columnMapping.mode属性设置为name,因为csv数据的列名中包含Delta/Parquet本机不允许的字符。一旦发生,即使在常规运行中,也会发生相同的错误。如有任何帮助,将不

  7. 当我在windows10上安装spark并在命令提示符下运行sparkshell时,它显示闪烁

    当我在windows10上安装spark并在命令提示符下运行sparkshell时,它显示闪烁,当按下控件c时,它会显示终止批处理作业(Y/N),尝试了许多方法但都不起作用,请参阅官方spark文档](https://i.stack.imgur.com/S1NrS.png)

  8. 在Spark Web UI中检查FAIR调度程序的池统计信息的位置

    我看到我的Spark应用程序正在使用FAIR调度程序:但我无法确认它是否使用了我设置的两个池。下面是我在PySpark中实现的线程函数我以为“阶段”菜单应该显示游泳池信息,但我没有看到。这是否意味着游泳池设置不正确,还是我看错了地方?我在EMR6.9.0之上使用PySpark3.3.0

  9. HashPartitioning数据帧以在PySpark中的连接期间实现联合分区

    为了实现#2,我应该尝试通过对两个数据帧的键进行共同分区,尽可能减少混洗。为了改进我的加入,我首先对event_type进行过滤,以缩小两个数据帧上的数据范围。然后我在day和event_id上执行实际的连接。我读到repartition在指定的列上实现了哈希分区。我将数据帧保存到磁盘上,还包括一个partitionBy,以便在过滤/分组操作上获得更好的性能。我甚至不确定同时使用repartition和partitionBy是正确的方法。当我从磁盘重新读取拼花地板文件时,使用repartition()的初

  10. Pyspark:异常:Java网关进程在发送驱动程序的端口号之前退出

    但问题从未得到解决.请帮忙!

随机推荐

  1. 如何扩展ATmega324PB微控制器的以下宏寄存器?

    我目前正在学习嵌入式,我有以下练习:展开以下宏寄存器:如果有人解决了这个问题,我将不胜感激,以便将来参考

  2. Python将ONNX运行时设置为返回张量而不是numpy数组

    在python中,我正在加载预定义的模型:然后我加载一些数据并运行它:到目前为止,它仍在正常工作,但我希望它默认返回Tensor列表,而不是numpy数组。我对ONNX和PyTorch都是新手,我觉得这是我在这里缺少的基本内容。这将使转换中的一些开销相同。

  3. 在macOS上的终端中使用Shell查找文件中的单词

    我有一个文本文件,其中有一行:我需要找到ID并将其提取到变量中。我想出了一个RexEx模式:但它似乎对我尝试过的任何东西都不起作用:grep、sed——不管怎样。我的一个尝试是:我为这样一个看似愚蠢的问题感到抱歉,但我在互联网上找不到任何东西:我在SO和SE上读了几十个类似的问题,并在谷歌上搜索了几个教程,但仍然无法找到答案。欢迎提供任何指导!

  4. react-chartjs-2甜甜圈图中只有标题未更新

    我正在使用react-chartjs-2在我的网站中实现甜甜圈图。下面是我用来呈现图表的代码。我将甜甜圈图的详细信息从父组件传递到子组件,所有道具都正确传递。当我在beforeDraw函数外部记录props.title时,它会记录正确的值,但当我在beforeDraw函数内部记录props.title时,它将记录标题的前一个值,从而呈现标题的前值。我在这里做错了什么?

  5. 如何在tkinter中使用Python生成器函数?

    生成器函数承诺使某些代码更易于编写。但我并不总是知道如何使用它们。假设我有一个斐波那契生成器函数fib(),我想要一个显示第一个结果的tkinter应用程序。当我点击“下一步”按钮时,它会显示第二个数字,依此类推。我如何构建应用程序来实现这一点?我可能需要在线程中运行生成器。但如何将其连接回GUI?

  6. 如何为每次提交将存储库历史记录拆分为一行?

    我正在尝试获取存储库的历史记录,但结果仅以单行文本的形式返回给我。

  7. 尝试在颤振项目上初始化Firebase时出错

    当尝试在我的颤振项目上初始化firebase时,我收到了这个错误有人知道我能做什么吗?应用程序分级Gradle插件Gradle项目颤振相关性我已经将firebase设置为Google文档已经在另一个模拟器上尝试过,已经尝试过创建一个全新的模拟器,已经在不同的设备上尝试过了,已经尝试了特定版本的firebase,已经尝试添加但没有任何效果,已经在youtube上看到了关于它的每一个视频,该应用程序在android和iOS两个平台上都抛出了这个错误

  8. 在unix中基于当前日期添加新列

    我试图在unix中基于时间戳列在最后一个单元格中添加一个状态列。我不确定如何继续。

  9. 麦克斯·蒙特利。我一直得到UncaughtReferenceError:当我在终端中写入node-v时,节点未定义

    如果这是您应该知道的,请确认:我已将所有shell更改为默认为zsh。当我在终端中写入node-v时,我一直收到“UncaughtReferenceError:nodeisnotdefined”。但它显示节点已安装。我是个新手,在这方面经验不足。

  10. 如何在前端单击按钮时调用后端中的函数?

    那么如何在后端添加一个新的端点,点击按钮调用这个函数。

返回
顶部