跑完WordCount程序后,想在Hadoop集群上在熟练几个程序,毕竟辛苦搭建好的集群,选择了简单的两个例子:数据去重+数据排序。这里记录下程序及运行中发生的一些问题及解决方案。


前提准备:1、Ubuntu16.0系统+eclipse;

2、Ubuntu server版本搭建的分布式集群系统(1台master,2台slaves);

3、eclipse所在的Ubuntu系统应该和集群系统配置SSH免密登录。

参考例程序: 点击打开链接

实例1:数据去重

描述:在原始数据中出现次数超过一次的数据在输出文件中只出现一次

方法:哪个不能重复哪个设置成Key

原始数据:

1、原始数据

file1

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

file2

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

数据输出:

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d



设计思路:Hadoop的Map类中可以自动排序,统计key值相同的数据项的数目,并且存放在value值中,交由Reduce类处理。这里可以通过Map程序处理数据,然后交由Reduce程序,输出的时候只输出Key值,value值置空即可。


package example2_re;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericoptionsParser;

public class SJQC {

    //map将输入中的value复制到输出数据的key上,并直接输出

    public static class Map extends Mapper<Object,Text,Text>{
        private static Text line=new Text();  //读入每行数据
        //实现map函数
        public void map(Object key,Text value,Context context) 
                       throws IOException,InterruptedException{
            line=value;
            context.write(line,new Text(""));
        }

    }

   

    //reduce将输入中的key复制到输出数据的key上,并直接输出
    public static class Reduce extends Reducer<Text,Text>{
        //实现reduce函数
        public void reduce(Text key,Iterable<Text> values,Context context)
                throws IOException,InterruptedException{
            context.write(key,new Text(""));
        }
   }

   

    public static void main(String[] args) throws Exception{
        
      Configuration conf = new Configuration();
     
     Job job = new Job(conf,"Data Deduplication");
     job.setJarByClass(SJQC.class);

     

     //设置Map、Combine和Reduce处理类

     job.setMapperClass(Map.class);
     job.setCombinerClass(Reduce.class);
     job.setReducerClass(Reduce.class);

     

     //设置输出类型

     job.setoutputKeyClass(Text.class);
     job.setoutputValueClass(Text.class);

     

     //设置输入和输出目录
     System.out.println("11111");
     FileInputFormat.addInputPaths(job,"hdfs://192.168.42.130:9000/input/example2_re");
     FileOutputFormat.setoutputPath(job,new Path("hdfs://192.168.42.130:9000/output/example2_re"));
     System.exit(job.waitForCompletion(true) ? 0 : 1);
     System.out.println("00000");
     }

}

实例2:数据排序

描述:对原始数据进行排序,利用map自带排序

原始数据:

file1

2

32

654

32

15

756

65223


file2

5956

22

650

92


file3

26

54

6


输出结果:(有重复数据)

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223

设计思路:应用MapReduce进行自动处理,Reduce输出阶段将Key当做value值输出,输出次数由value-list中的元素个数决定;key值用全局变量linenum来代替,从1开始每次递增,表示位次


package example3_DataSort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericoptionsParser;


public class DataSort {

  //map将输入中的value化成IntWritable类型,作为输出的key

   public static class Map extends Mapper<Object,IntWritable,IntWritable>{

       private static IntWritable data=new IntWritable();

       //实现map函数

       public void map(Object key,Context context)
               throws IOException,InterruptedException{

           String line=value.toString();
           data.set(Integer.parseInt(line));
           context.write(data,new IntWritable(1));
       }
   }

 
   //reduce将输入中的key复制到输出数据的key上,
   //然后根据输入的value-list中元素的个数决定key的输出次数
   //用全局linenum来代表key的位次

   public static class Reduce extends

           Reducer<IntWritable,IntWritable>{
       private static IntWritable linenum = new IntWritable(1);

       //实现reduce函数

       public void reduce(IntWritable key,Iterable<IntWritable> values,InterruptedException{

           for(IntWritable val:values){
               context.write(linenum,key);
               linenum = new IntWritable(linenum.get()+1);
           }
}

}

   public static void main(String[] args) throws Exception{

       Configuration conf = new Configuration();

      // conf.set("mapred.job.tracker","192.168.42.130:9001");

    Job job = new Job(conf,"Data Sort");
    job.setJarByClass(DataSort.class);

    //设置Map和Reduce处理类

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    
    //设置输出类型

    job.setoutputKeyClass(IntWritable.class);
    job.setoutputValueClass(IntWritable.class);

    //设置输入和输出目录
    FileInputFormat.addInputPaths(job,"hdfs://192.168.42.130:9000/input/example3_DataSort");
    FileOutputFormat.setoutputPath(job,new Path("hdfs://192.168.42.130:9000/output/example3_DataSort"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

   }
}


运行中遇到的问题:

1、eclipse控制台看不到日志信息,提示log4j的warning,如下:

log4j:WARN No appenders Could be foundforlogger(org.apache.Hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN hadoop See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


说明没有配置log4j.properties文件。这虽然不影响程序的正常运行,但是看不到log日志,不能及时发现错误位置。 解决方法: 把Hadoop2.7.3的安装目录下面的/etc/hadoop/目录下面的log4j.properties文件拷贝放到MapReduce工程的src目录下面。但是每个工程都需要拷贝,暂时没有找到一次配置的方法。

Ubuntu+eclipse,进行Hadoop集群操作实例数据去重+数据排序的更多相关文章

  1. iOS:用于填充异步提取数据的设计模式

    我正在开发一个从Web获取数据并将其显示给用户的应用程序.假设数据是餐馆的评论,并且在一个视图上显示一个评论.用户可以向左或向右滑动以转到上一个/下一个评论.数据是异步提取的.这是问题陈述–假设已经提取了5条评论,并且用户正在查看当前的第3条评论.现在,第6次审核被提取,我想将其显示为用户的第4次审核.我的模型类应该如何通知视图控制器?除上述3之外的其他建议值得欢迎!

  2. ios – 1天后firebase crashlytics报告中没有数据

    解决方法对于那些仍然有问题的人.检查您的podfile中是否还有pod’Firebase/Crash’.当我删除旧的Firebase崩溃报告时,我的问题已修复.

  3. 将AWS DynamoDB表中的数据加载到iOS上的UITableView

    我的iOS应用程序中使用Swift编写的一个屏幕是UITableView.在这个UITableView中,我想从AWSDynamoDB表中加载名为Books的数据.目前,这是我在故事板上的原型单元格:在表格中我有3个属性:“名称”,“价格”和“ISBN”.我想要的是扫描“书籍”表,并过滤结果,因此结果的“ISBN”属性将包含数字“9”.在我筛选结果后,我想将它们应用到UITableView,因此“

  4. ios – 未为测试目标生成核心数据类

    我使用CoreData的自动生成的类.除测试目标外,我的项目还有3个目标.对于每个目标,正确生成CoreData类,我通过检查DerivedData文件夹进行验证.但是,尽管在核心数据模型文件中打勾,但不会为测试目标生成类.当我尝试引用测试目标中的一个CoreData类时,这会导致“未声明的标识符”和“使用未声明的类型”错误.我该如何解决这个问题?

  5. ios – NSURLCache和数据保护

    我正在尝试保护存储在NSURLCache中的敏感数据.我的应用程序文件和CoreDatasqlite文件设置为NSFileProtectionComplete.但是,我无法将NSURLCache文件数据保护级别更改为NSFileProtectionCompleteUntilFirstUserAuthentication以外的任何其他级别.这会在设备锁定时暴露缓存中的任何敏感数据.我需要缓存响应,以

  6. ios – 领域:如何获取数据库的当前大小

    是否有RealmAPI方法使用RealmSwift作为数据存储来获取我的RealmSwift应用程序的当前数据库大小?

  7. 核心数据 – 核心数据NSPersistentStore问题

    我正在开发一个分阶段推出的应用程序.对于每个sprint,都有数据库更改,因此已实现核心数据迁移.到目前为止,我们已经有3个阶段发布.每当连续升级时,应用程序运行正常.但每当我尝试从版本1升级到版本3时,都会发生’无法添加持久存储’错误.有人可以帮我解决这个问题吗?

  8. iOS Swift在哪里存储用户登录数据或OAuth令牌?

    事情并不像在用户手机上存储登录数据的最佳做法那样清晰.有人建议将userID=123和loggedIn=true类型数据等数据存储在NSUSerDefaults数据中.然而根据我的理解,根据这篇文章https://www.andyibanez.com/nsuserdefaults-not-for-sensitive-data/,这些数据可以很容易地被操作所以问题是:当用户浏览各种屏幕时,持久登录数

  9. ios – Swift – 使用字典数组从字典访问数据时出错

    我有一个非常简单的例子,说明我想做什么基本上,我有一个字典,其值包含[String:String]字典数组.我把数据填入其中,但当我去访问数据时,我收到此错误:Cannotsubscriptavalueoftype‘[([String:String])]?’withanindexoftype‘Int’请让我知道我做错了什么.解决方法您的常量数组是可选的.订阅字典总是返回一个可选项.你必须打开它.更

  10. ios – 在iphone xcode中存储纬度经度的最佳和最精确的数据类型是什么?

    我正在构建一个基于地理定位的应用程序,我想知道哪个数据类型最适合存储lat/long我使用doubleValue但我认为我们可以更精确地像10个小数位.解决方法double是iOS本身使用的值.iOS使用CLLocationdegrees来表示lat/long值,它是double的typedef.IMO,使用double/CLLocationdegrees将是最佳选择.

随机推荐

  1. crontab发送一个月份的电子邮件

    ubuntu14.04邮件服务器:Postfixroot收到来自crontab的十几封电子邮件.这些邮件包含PHP警告.>我已经解决了这些警告的原因.>我已修复每个cronjobs不发送电子邮件(输出发送到>/dev/null2>&1)>我删除了之前的所有电子邮件/var/mail/root/var/spool/mail/root但我仍然每小时收到十几封电子邮件.这些电子邮件来自cronjobs,

  2. 模拟两个ubuntu服务器计算机之间的慢速连接

    我想模拟以下场景:假设我有4台ubuntu服务器机器A,B,C和D.我想在机器A和机器C之间减少20%的网络带宽,在A和B之间减少10%.使用网络模拟/限制工具来做到这一点?

  3. ubuntu-12.04 – 如何在ubuntu 12.04中卸载从源安装的redis?

    我从源代码在Ubuntu12.04上安装了redis-server.但在某些时候它无法完全安装,最后一次makeinstallcmd失败.然后我刚刚通过apt包安装.现在我很困惑哪个安装正在运行哪个conf文件?实际上我想卸载/删除通过源安装的所有内容,只是想安装一个包.转到源代码树并尝试以下命令:如果这不起作用,您可以列出软件自行安装所需的步骤:

  4. ubuntu – “apt-get source”无法找到包但“apt-get install”和“apt-get cache”可以找到它

    我正在尝试下载软件包的源代码,但是当我运行时它无法找到.但是当我运行apt-cache搜索squid3时,它会找到它.它也适用于apt-getinstallsquid3.我使用的是Ubuntu11.04服务器,这是我的/etc/apt/sources.list我已经多次更新了.我尝试了很多不同的debs,并没有发现任何其他地方的错误.这里的问题是你的二进制包(deb)与你的源包(deb-src)不

  5. ubuntu – 有没有办法检测nginx何时完成正常关闭?

    &&touchrestarted),因为即使Nginx没有完成其关闭,touch命令也会立即执行.有没有好办法呢?这样的事情怎么样?因此,pgrep将查找任何Nginx进程,而while循环将让它坐在那里直到它们全部消失.你可以改变一些有用的东西,比如睡1;/etc/init.d/Nginx停止,以便它会休眠一秒钟,然后尝试使用init.d脚本停止Nginx.你也可以在某处放置一个计数器,这样你就可以在需要太长时间时发出轰击信号.

  6. ubuntu – 如何将所有外发电子邮件从postfix重定向到单个地址进行测试

    我正在为基于Web的应用程序设置测试服务器,该应用程序发送一些电子邮件通知.有时候测试是使用真实的客户数据进行的,因此我需要保证服务器在我们测试时无法向真实客户发送电子邮件.我想要的是配置postfix,以便它接收任何外发电子邮件并将其重定向到一个电子邮件地址,而不是传递到真正的目的地.我正在运行ubuntu服务器9.10.先感谢您设置本地用户以接收所有被困邮件:你需要在main.cf中添加:然后

  7. ubuntu – vagrant无法连接到虚拟框

    当我使用基本的Vagrantfile,只配置了两条线:我看到我的虚拟框打开,但是我的流氓日志多次显示此行直到超时:然后,超时后的一段时间,虚拟框框终于要求我登录,但是太久了!所以我用流氓/流氓记录.然后在我的物理机器上,如果我“流氓ssh”.没有事情发生,直到:怎么了?

  8. ubuntu – Nginx – 转发HTTP AUTH – 用户?

    我和Nginx和Jenkins有些麻烦.我尝试使用Nginx作为Jenkins实例的反向代理,使用HTTP基本身份验证.它到目前为止工作,但我不知道如何传递带有AUTH用户名的标头?}尝试将此指令添加到您的位置块

  9. Debian / Ubuntu – 删除后如何恢复/ var / cache / apt结构?

    我在ubuntu服务器上的空间不足,所以我做了这个命令以节省空间但是现在在尝试使用apt时,我会收到以下错误:等等显然我删除了一些目录结构.有没有办法做apt-getrebuild-var-tree或类似的?

  10. 检查ubuntu上安装的rubygems版本?

    如何查看我的ubuntu盒子上安装的rubygems版本?只是一个想法,列出已安装的软件包和grep为ruby或宝石或其他:)dpkg–get-selections

返回
顶部