博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ChainMapper ,ChainReducer,多个Job串行
阅读量:6814 次
发布时间:2019-06-26

本文共 3040 字,大约阅读时间需要 10 分钟。

情况:

在进行高级的数据处理时,你会发现你的程序不能放在一个的MapReduce job之中了。Hadoop支持将多个MapReduce Job串成一条链来形成一个更大的MapReduce Job。同时你会发现Hadoop数据处理过程中通常包括多个数据源,我们将探索一些join技术同时处理多个数据源。

1.将具有复杂依赖关系的多个MapReduce Job串联起来。

情况:有三个Job,分别成为Job1,Job2,Job3,这三个Job的关系是Job1、Job2可以同时运行,但Job3必须等待Job1、Job2都完成后才能运行。

解决方案:Hadoop提供解决这样复杂依赖关系的类,Job以及JobControl(mapred包内,新API还相当不完善。)

使用Job的addDependingJob()函数来添加依赖关系,例如:Job1.addDependingJob(Job2)表示Job2不完成,Job1则不会开始。

2.串联一个Job之上的预处理和后处理Mapper步骤。

情况:有许多的数据处理工作包括针对一条记录的预处理和后处理,例如进行文档信息检索的时候,我们需要首先去除掉a,the等无太大意义的词汇,然后再转换单词格式(finish,finished等不同格式统一转换为finish)然后再进行处理步骤。

解决方案:Hadoop提供ChainMapper和ChainReducer提供这样的功能。详见《Hadoop in Action》

 
The ChainMapper class allows to use multiple Mapper classes within a single Map task. 

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

  

 通过ChainMapper可以将多个map类合并成一个map任务。

下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

源文件

100 tom 90
101 mary 85
102 kate 60

map00的结果,过滤掉100的记录

101 mary 85
102 kate 60

map01的结果,过滤掉101的记录

102 kate 60

reduce结果

102 kate 60

 package org.myorg;

 

import  java.io.IOException;
import  java.util. * ;
import  java.lang.String;
import  org.apache.hadoop.fs.Path;
import  org.apache.hadoop.conf. * ;
import  org.apache.hadoop.io. * ;
import  org.apache.hadoop.mapred. * ;
import  org.apache.hadoop.util. * ;
import  org.apache.hadoop.mapred.lib. * ;
public   class  WordCount
{
     public   static   class  Map00  extends  MapReduceBase  implements  Mapper
    {
         public   void  map(Text key, Text value, OutputCollector output, Reporter reporter)  throws  IOException
        {
            Text ft  =   new  Text(“ 100 ″);
             if ( ! key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
     public   static   class  Map01  extends  MapReduceBase  implements  Mapper
    {
         public   void  map(Text key, Text value, OutputCollector output, Reporter reporter)  throws  IOException
        {
            Text ft  =   new  Text(“ 101 ″);
             if ( ! key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
     public   static   class  Reduce  extends  MapReduceBase  implements  Reducer
    {
         public   void  reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)  throws  IOException
        {
             while (values.hasNext())
            {
                output.collect(key, values.next());
            }
        }
    }
     public   static   void  main(String[] args)  throws  Exception
    {
        JobConf conf  =   new  JobConf(WordCount. class );
        conf.setJobName(“wordcount00″);
        conf.setInputFormat(KeyValueTextInputFormat. class );
        conf.setOutputFormat(TextOutputFormat. class );
        ChainMapper cm  =   new  ChainMapper();
        JobConf mapAConf  =   new  JobConf( false );
        cm.addMapper(conf, Map00. class , Text. class , Text. class , Text. class , Text. class ,  true , mapAConf);
        JobConf mapBConf  =   new  JobConf( false );
        cm.addMapper(conf, Map01. class , Text. class , Text. class , Text. class , Text. class ,  true , mapBConf);
        conf.setReducerClass(Reduce. class );
        conf00.setOutputKeyClass(Text. class );
        conf00.setOutputValueClass(Text. class );
        FileInputFormat.setInputPaths(conf,  new  Path(args[ 0 ]));
        FileOutputFormat.setOutputPath(conf,  new  Path(args[ 1 ]));
        JobClient.runJob(conf);
    }
}

转载于:https://www.cnblogs.com/zhanghuijunjava/archive/2013/04/30/3036573.html

你可能感兴趣的文章
spark启动简单脚本
查看>>
centos6.5中安装htop进程管理监控工具
查看>>
juniper基本配置命令 自用
查看>>
hadoop学习笔记之--- HDFS原理学习
查看>>
ThinkPHP 学习笔记(四) ThinkPHP的配置
查看>>
win32 UNICODE 支持
查看>>
MySQL+DRBD+Corosync+Pacemaker CentOS6.5版
查看>>
在CentOS 6.5上安装和配Xen
查看>>
重载类的 new,delete,new[],delete[] 运算符成员函数
查看>>
Express 3.x升级到4.x 优缺点
查看>>
我的友情链接
查看>>
inittab文件丢失恢复
查看>>
ocjp 51-60
查看>>
我的友情链接
查看>>
windows下的任务不能自动执行的解决办法
查看>>
VACL配置说明
查看>>
shell防DDOS
查看>>
go语言 学习笔记1
查看>>
一键包安装lamp或lnmp环境
查看>>
网络提速(最短路)
查看>>