我有一个使用Apache Spark的 Java程序.该程序中最有趣的部分如下所示:
long seed = System.nanoTime();

JavaRDD<AnnotatedDocument> annotated = documents
    .mapPartitionsWithIndex(new InitialAnnotater(seed),true);
annotated.cache();

for (int iter = 0; iter < 2000; iter++) {
    GlobalCounts counts = annotated
        .mapPartitions(new GlobalCounter())
        .reduce((a,b) -> a.sum(b)); // update overall counts (*)

    seed = System.nanoTime();

    // copy overall counts which CountChanger uses to compute a stochastic thing (**)
    annotated = annotated
        .mapPartitionsWithIndex(new CountChanger(counts,seed),true); 
    annotated.cache();

    // adding these lines causes constant time complexity like i want
    //List<AnnotatedDocument> ll = annotated.collect();
    //annotated = sc.parallelize(ll,8); 
}

因此,实际上,行(**)会生成带有表单的RDD

documents
    .mapPartitionsWithIndex(initial)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    ... 2000 more

确实是一个很长的地图链.此外,由于需要更新计数,因此行(*)会在每次迭代时强制计算(非惰性).

我遇到的问题是,我得到的时间复杂度随着每次迭代而线性增加,因此总体上呈二次方式:

我认为这是因为Spark试图“记住”链中的每个RDD,以及容错算法或导致其增长的任何因素.但是,我真的不知道.

我真正想做的是在每次迭代时告诉Spark“崩溃”RDD,以便只有最后一个保存在内存中并继续工作.我认为这应该导致每次迭代的时间不变.这可能吗?还有其他解决方案吗?

谢谢!

解决方法

尝试使用rdd.checkpoint.这将把RDD保存到hdfs并清除沿袭.

每次转换RDD时,都会增加谱系,Spark必须跟踪可用内容和必须重新计算的内容.处理DAG是昂贵的,并且大型DAG倾向于非常快地杀死性能.通过“检查点”,您可以指示Spark计算并保存生成的RDD,并丢弃其创建方式的信息.这使得它类似于简单地保存RDD并将其读回,从而最大限度地减少DAG操作.

在旁注中,由于您遇到了这个问题,因此最好知道union也会通过添加步骤来影响RDD性能,并且由于沿袭信息的方式,也可能抛出StackOverflowError. See this post

This link有更多细节和漂亮的图表,主题也提到了in this SO post.

Java Apache Spark:长转换链导致二次时间的更多相关文章

  1. Ubuntu安装Spark

    安装Python$sudoapt-getinstallpython3安装完毕后,使用如下命令查看python3的具体版本:$/usr/bin/python3-VPython3.5.2安装scala:$sudoapt-getinstallscalascala-docscala-library查看安装的scala的版本信息:$scala-versionScalacoderunnerversion2.1

  2. 基于CentOS的Hadoop和Spark分布式集群搭建过程

    IP地址:192.168.106.128(主节点);192.168.106.129(从节点);192.168.106.130(从节点)。

  3. Ubuntu下Spark单机版Standalone安装

    选择完毕,点击>DownloadSpark

  4. centos7.2(linux)+spark2.1.0安装

    介绍下spark在Linux上的安装.操作系统是centos,centos其实是纯净版的Linux.(1)版本spark版本选2.x以上.2.1.0是去年中旬刚出来的.centos7.264(2)下载spark下载地址,百度.(3)安装解压,放到/usr路径下,本人的是:配置spark安装路径,打开.bash_profile.这个文件里面要预先配置javapath.回到spark目录,spark-

  5. 基于CentOS6.4环境编译Spark-2.1.0源码

    1写在前面的话有些小伙伴可能会问:Spark官网不是已经提供了Spark针对不同版本的安装包了吗,我们为什么还需要对Spark源码进行编译呢?针对如上列出的两点的个人觉得比较好的最佳实践:根据生产上运行的Hadoop版本编译出Spark的安装包修改Spark源码之后,重新编译Spark所以:个人觉得如果想更好的学习和使用Spark,那么第一步就是要会根据Spark源码编译出安装包。

  6. Ubuntu系统搭建单机Spark注意事项

    对于Spark而言,如果大家只是想摸一下、熟悉熟悉而已,可以搭建单机的Spark,大致步骤如下(我使用VMWare下的Ubuntu14.04,暂不考虑安全问题,在root下运行):1、安装Ubuntu14.04,注意装好后需要禁用防火墙(ufwdisable),安装SSH服务器,启用root用户2、下载安装JDK-1.8、scala2.11.8(需要和spark的jar版本配合下,这个其实不是很必

  7. Ubuntu 16.04 SPARK 开发环境搭建-- 伪分布版 与新建一个Spark版本的WordCount

    Ubuntu16.04SPARK开发环境搭建这里首先是基于Hadoop已经装好了情况下,安装SPARK.具体Hadoop安装参考:点击打开链接如果你没安装JDK请安装,你在安装Hadoop时候也必须安装JDK这里也稍微写点初始工作:1.安装JDK,下载jdk-8u111-linux-x64.tar.gz,解压到/opt/jdk1.8.0_111下载地址:http://www.Oracle.com/

  8. Ubuntu安装Spark和Hadoop集群

    一、JDK安装手动解压JDK的压缩包,然后设置环境变量1.1在/usr/目录下创建java目录root@ubuntu:~#mkdir/usr/javaroot@ubuntu:~#cd/usr/java1.2下载jdk,然后解压http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.htmlroot

  9. Pydoop在AWS的Ubuntu上Segmentation fault 错误处理

    问题描述平台用Python开发,使用pydoop对HDFS文件进行操作,当迁移到AWS时,出现SegmentationFault,通过排查,确定问题是因为调用了Pydoop对hdfs的初始化过程中出现的问题。

  10. Ubuntu 下安装sparklyr 并连接远程spark集群

    安装sparklyr1.通过devtools包实现sparklyr包的安装:出现问题:安装不上,因为Ubuntu中需要安装一些包2.在本地安装spark3.设置JAVA,SPRK,SPRK_VERSION环境变量4.连接本地spark5.连接远程spark注意:之前如果设置了环境变量的话,就直接使用以上语句就行,否则的话得在函数里添加环境变量的参数。

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部