传智播客-第3天

传智播客-第3天


2024年1月14日发(作者:)

第5章分布式计算利器——MapReduce(一)以下是本章学习要点:MapReduce原理★★★MapReduce执行过程★★数据类型与格式★★★Writable接口与序列化机制★★★MapReduce是Hadoop的核心组成,是专用于进行数据计算的。本章主要研究如何使用MapReduce完成Hadoop中的计算任务。读者应该重点掌握实现MapReduce算法的步骤,掌握map、reduce函数的特点、如何写函数。我们的开发环境还是在宿主机中运行的eclipse,运行程序的时候经常会出现java内存不足的情况,请参考图5-1修改虚拟机内存参数:图5-1在图5-1中,把DefaultVMArguments修改为-Xms64m-Xmx128m。、Reduce是什么

中的map和reduce如果我们把MapReduce拆开看,就是两个单词map和reduce。在本书中,map翻译为“映射”,reduce翻译为“归约”。这两个单词在有的编程语言中属于内置的两个函数。我们以Python语言举例,该语言中map、reduce属于内置函数。先看一下map函数的用法,如图5-2图5-2在图5-2中,第一行表示定义一个列表变量,有5个元素。第二行是调用了内置的map函数,该函数接收两个参数,第一个是匿名函数;第二个是刚才定义的列表变量。匿名函数有一个形参是x,返回值就是x+3的结果,相当于返回对参数+3返回。m表示map运行后的记过。第三行是打印输出m。第四行是输出的内容。从例子中可以看到,列表元素有map函数是把第一个形参函数作用于每一个列表元素。5个,那么匿名函数就会被调用5次,每次调用把元素作为匿名函数的形参传入,最终结果是还是含有5个元素的列表。下面再看一下reduce函数,如图5-3图5-3在上图中,第一句是加载函数所在的包,类似于java中的import语句。第二句是定义一个含有5个元素的列表变量。第三句是调用reduce函数。这个函数与map函数类似,也是接收两个参数,第一个参数是定义的匿名函数,第二个参数是刚才定义的列表变量。匿名函数有两个形参x和y,返回值就是x+y运算的结果,s表示运算后的结果。第四行语句表示打印输出s。第五行是输出的结果。从例子中可以看出,匿名函数的形参有两个,分别是x和y,取值来自于第二个形参a

中的每个元素。通过匿名函数对a中的元素进行聚合,把5个元素通过加法运算聚合成一个结果。这就是reduce函数。注意:以上是函数式语言Python的语法,允许函数作为函数的形参,这在java中是不允许的。读者对此不必过分关心,只需要知道map、reduce函数在其他语言中也是存在的,并不是Hadoop的专利。中的Map和Reduce在Hadoop中,map函数位于内置类中,reduce函数位于内置类uce.我们要做的就是覆盖map函数和reduce函Reducer中。数。对于Hadoop的map函数和reduce函数,处理的数据是键值对,也就是说map函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce函数接收的参数和输出的结果也是键值对。现在再看一下Mapper类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、后面两个KEYOUT、VALUEIN指的是map函数输入的参数key、value的类型;VALUEOUT指的是map函数输出的key、value的类型。map函数定义如图5-4图5-4在图5-4中,输入参数key、value的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map函数。在这里,map函数没有处理输入的key、value,直接通过(…)方法输出了,输出的key、value的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。看一下Reducer类,也有四个泛型,同理,分别指的是reduce函数输入的key、value类型,和输出的key、value类型。看一下reduce函数定义,如图5-5

图5-5在图5-5中,reduce函数的形参key、value的类型是KEYIN、VALUEIN。要注意这里的value是存在于le中的,这是一个迭代器,用于集合遍历的,意味着values是一个集合。reduce函数默认实现是把每个value和对应的key,通过调用(…)输出了,这里输出的类型是KEYOUT、VALUEOUT。通常我们会根据业务逻辑覆盖reduce函数的实现。现在读者会有几个问题,输入的内容在哪里,输入内容如何解析成键值对,map函数与reduce函数如何联系在一起,输出到哪里等等?请继续向下看。5.2.分析MapReduce执行过程MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中。整个流程如图5-6图任务的执行过程每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我们覆盖的map方法处理后,转换为很多的键值对再输出。整个Mapper任务的处理过程又可以分为以下几个阶段,如图5-7

图5-7在图5-7中,把Mapper任务的运行过程分为六个阶段。第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper进程处理。这里的三个输入片,会有三个Mapper进程处理。第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。第三阶段是调用Mapper类中的map方法。第二阶段中解析出来的每一个键值对,调用一次map方法。如果有1000个键值对,就会调用1000次map方法。每一次调用map方法会输出零个或者多个键值对。第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux文件中。第六阶段是对数据进行归约处理,也就是reduce处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linxu文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。r任务的执行过程每个Reducer任务是一个java进程。Reducer任务接收Mapper任务的输出,归约处理后写入到HDFS中,可以分为如图5-8所示的几个阶段图5-8第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。5.2.3.键值对的编号在对Mapper任务、Reducer任务的分析过程中,会看到很多阶段都出现了键值对,读者容易混淆,所以这里对键值对进行编号,方便大家理解键值对的变化情况。如图5-9图5-9在图5-9中,对于Mapper任务输入的键值对,定义为key1和value1。在map方法中处理后,输出的键值对,定义为key2和value2。reduce方法接收key2和value2,处理后,输出key3和value3。在下文讨论键值对时,可能把key1和value1简写为,key2和value2简写为,key3和value3简写为。5.2.4.举例:单词计数该业务要求统计指定文件中的所有单词的出现次数。下面看一下源文件的内容,如图5-9-1图5-9-1内容很简单,两行文本,每行的单词中间使用空格区分。分析思路:最直观的想法是使用数据结构Map。解析文件中出现的每个单词,用单词作为key,出现次数作为value。这个思路没有问题,但是在大数据环境下就不行了。我们需要使用MapReduce来做。根据Mapper任务和Reducer任务的运行阶段,我们知道在Mapper任务的第二阶段是把文件的每一行转化成键值对,那么第三阶段的map方法就能取得每一行文本内容,我们可以在map方法统计本行文本中单词出现的次数,把每个单词的出现次数作为新的键值对输出。在Reducer任务的第二阶段会对Mapper任务输出的键值对按照键进行排序,键相等的键值对会调用一次reduce方法。在这里,“键”就是单词,“值”就是出现次数。因此可以在reduce方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把这个结果输出。看一下如何覆盖map方法staticclassMyMapperextendsMapper{

//key2//value2//key//value表示该行中的单词表示单词在该行中的出现次数表示文本行的起始位置表示文本行finalTextkey2=newText();finalIntWritablevalue2=newIntWritable(1);protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)ption,InterruptedException{finalString[]splited=ng().split("");for(Stringword:splited){(word);//把key2、value2写入到context中(key2,value2);}};}上面代码中,注意Mapper类的泛型不是java的基本类型,而是Hadoop的数据类型LongWritable、Text、IntWritable。读者可以简单的等价为java的类long、String、int。下文会有专门讲解Hadoop的数据类型。代码中Mapper类的泛型依次是。map方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文本内容按照空格拆分,把每个单词作为新的键,数值1作为新的值,写入到上下文context中。在这里,因为输出的是每个单词,所以出现次数是常量1。如果一行文本中包括两个hello,会输出两次。再来看一下如何覆盖reduce方法staticclassMyReducerextendsReducer{//value3表示单词出现的总次数finalIntWritablevalue3=newIntWritable(0);/***key*values*context*/protectedvoidreduce(Textkey,levalues,Contextcontext)ption,InterruptedException{intsum=0;for(IntWritablecount:values){sum+=();}//执行到这里,sum表示该单词出现的总次数//key3表示单词,是最后输出的keyfinalTextkey3=key;//value3表示单词出现的总次数,是最后输出的value表示单词表示map方法输出的1的集合上下文对象

(sum);(key3,value3);};}上面代码中,Reducer类的四个泛型依次是,要注意reduce方法的第二个参数是le类型,迭代的是v2。也就是k2相同的v2都可以迭代出来。以上就是我们覆盖的map方法和reduce方法。现在要把我们的代码运行起来,需要写驱动代码,如下/***驱动代码*/publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{//输入路径finalStringINPUT_PATH="hdfs://hadoop0:9000/input";//输出路径,必须是不存在的finalStringOUTPUT_PATH="hdfs://hadoop0:9000/output";//创建一个job对象,封装运行时需要的所有信息finalJobjob=newJob(newConfiguration(),"WordCountApp");//如果需要打成jar运行,需要下面这句ByClass();//告诉job执行作业时输入文件的路径utPaths(job,INPUT_PATH);//设置把输入文件处理成键值对的类utFormatClass();//设置自定义的Mapper类perClass();//设置map方法输出的k2、v2的类型OutputKeyClass();OutputValueClass();//设置对k2分区的类titionerClass();//设置运行的Reducer任务的数量ReduceTasks(1);//设置自定义的Reducer类ucerClass();//设置reduce方法输出的k3、v3的类型putKeyClass();putValueClass();//告诉job执行作业时的输出路径putPath(job,newPath(OUTPUT_PATH));//指明输出的k3类型

putKeyClass();//指明输出的v3类型putValueClass();//让作业运行,直到运行结束,程序退出rCompletion(true);}--------------------------------------------------------------------------------------------------------------------------------在以上代码中,我们创建了一个job对象,这个对象封装了我们的任务,可以提交到Hadoop独立运行。最后一句rCompletion(true),表示把job对象提交给Hadoop运行,直到作业运行结束后才可以。以上代码的运行方式有两种,一种是在宿主机的eclipse环境中运行,一种是打成jar包在linux中运行。第一种运行方式要求宿主机能够访问linux,并且对于输入路径和输出路径中的主机名hadoop0,要在宿主机的hosts文件中有绑定,笔者的hosts文件位于C:WINDOWSsystem32driversetc文件夹。第二种运行方式,需要把代码打成jar包,在linux下执行命令运行,如下图图5-10运行结束后,文件路径在hdfs://hadoop0:9000/output/part-r-00000。我们看一下输出结果,如图5-10-1图的数据类型5.3.1.序列化5.3.2.基本数据类型序列化是干什么用的?本质上讲,就是数据保存到java虚拟机之外,然后又被读到java虚拟机内.如果仅仅是保存,不管是否能读进java虚拟机的话,就不关心序列化问题了。正是因为需要被读进java虚拟机,所以必须识别写出、读入的格式、字符顺序等问题。因此序

列化也就是比较重视的事情了。拿密码来打比方。序列化就像加密,反序列化就像解密。Hadoop作为分布式存储系统必然涉及到序列化问题。在前面的例子中,我们看到Mapper、Reducer类中都使用了Hadoop自己的数据类型LongWritable、IntWritable、Text。这些数据类型都有一个共同的特点,就是实现了le接口。我们看一下这个接口的源码,如图5-11图5-11从图5-11中可以看到Writable接口只有两个方法,一个是writer方法,一个是readFields方法。前者是把对象的属性序列化到DataOutput中去,后者是从DataInput把数据反序列化到对象的属性中。java中的基本类型有char、byte、boolean、short、int、float、double共7中基本类型,除了char,都有对应的Writable类型。对于int和long除了IntWritable、LongWritable外,还有对应的VintWritable、VlongWritable。除此类型之外,还有字符串类型Text、字节数组类型BytesWritable、空类型NullWritable、对象类型ObjectWritable。以上这些类型构成了mapreduce运算的基本类型。这些类型都实现了接口WritableComparable,如图5-12图5-12从图5-12中可以看到,这个接口仅仅多了Comparable接口。实现able接口的目的是为了调用equals方法进行比较。我们看一下LongWritable类的源码,如图5-13

图5-13从图5-13中可以看到,该类实现了WritableComparable接口,内部有个long类型的属性value,在readFields方法中从in中把long类型的值读出来,赋给value,这是“反序列化”过程;在write方法中把value写入到out中,这是“序列化”过程。读者可以想一下:自己是否可以封装一个复杂的类型哪?除了基本类型外,还有集合类型ArrayWritable、TwoDArrayWritable、MapWritable、SortedMapWritable。5.3.3.集合数据类型ceFile上传文件时,如果文件的size小于block的size,那么每个文件就会占用一个block(不是64MB,而是文件实际大小)。如果有非常多的小文件需要上传,那么就需要非常多的block。每一个block在NameNode的内存中都会有一个描述信息,这样会占用过多的NameNode内存。SequenceFile可以把大量小文件一起放到一个block中。在存储相同数量的文件时,可以明显减少block的数量。假设有3个小文件,那么会产生3个block,那么4个文件存储后对应的block如图5-14:

图5-14如果使用SequenceFile存储后,只会产生一个block,如图5-15:图5-15可以看出,同样多的小文件,但是占用的block明显少了。这就是SequenceFile的作用。另外,SequenceFile还可以压缩存储的内容,进一步减少文件体积。5.4.输入文件格式化类InoutFormat类InputFomat是负责把HDFS中的文件经过一系列处理变成map函数的输入部分的。这个类做了三件事情:验证输入信息的合法性,包括输入路径是否存在等;把HDFS中的文件按照一定规则拆分成InputSplit,每个InputSplit由一个Mapper执行;提供RecordReader,把InputSplit中的每一行解析出来供map函数处理;我们看一下这个类的源码,如图5-16

图5-16从图5-16中可以看到,该类只有两个方法的声明,方法getSplits的作用是把输入文件划分为很多的输入分片,方法createRecordReader的作用是输入分片的记录读取器。这些方法的实现都在子类中。putFormatInputFormat有个子类是FileInputFormat,这是在我们的例子中见到的,我们看一下该类对getSplits方法的实现,如图5-17。

图5-17在图5-17中,第247行计算minSize,是供后面计算使用的,其中getFormatMinSplitSize()方法的值是1,getMinSplitSize(job)方法的值由配置参数指定,默认值是1,所以minSize的默认值就是1。第248行计算maxSize,是供后面计算使用的,值由配置参数指定,默认值是long的最大值。第252行files列表中存放的是输入文件,可能有多个。从第253行开始,循环处理每一个输入文件。第254行是获得文件路径,第256行是获得文件长度,第257行是获得文件块位置。如果文件非空,并且文件允许被分割为输入块,那么就进入第258行的条件判断中。第259行是读取文件块size,默认是64MB,第260行是计算输入块size,我们看一下computeSplitSize方法,如图5-18图5-18从图5-18中可以看出,输入块size由三个因素决定,分别是minSize、maxSize、blockSize。根据前面的数值,可以得知,输入分片的默认size是文件块size。我们回到图5-17中继续分析,在第263至268行的循环体中,是对文件按照输入分片size进行切分。总结一下上面的分析,如果输入文件有3个,那么产生的输入分片的情况如表5-1所示文件大小产生的输入片

输入文件1输入文件2输入文件3表5-163MB64MB65MB1个1个2个注:参数、、采用默认值注意:每一个输入分片启动一个Mapper任务。源码在JobInProcess中,如图5-19图5-19通过以上分析,我们知道很多的输入文件是如何划分成很多的输入分片的。那么每个输入分片中的记录又是如何处理的哪?我们继续分析。putFormat该类中有个很重要的方法是实现InputFormat中的createRecordReader,如图5-20图5-20在图5-20中,该方法直接返回一个实例化的LineRecordReader类,我们看一下这个类,如图5-21。

图5-21在图5-21中,可以看到该类的几个属性,其中start、pos、end表示文件中字节的位置,key和value表示从记录中解析出的键和值,in是一个行内容的读取器。继续分析其中的initialize方法,如图5-22图5-22initialize(…)方法是该类的初始化方法,在调用其他方法前先调用该方法,并且只调用一次。从图5-22中可以看到,该类对类FileSplit的对象split进行了分析,属性start表示split的起始位置,属性end表示split的结束位置,属性in表示split的阅读器。下面查看方法nextKeyValue的源码,如图5-23

图5-23在图5-23中,key的值是pos的值,那么这个pos的值来自第97行的ne(…)方法的返回值。类LineReader的方法readLine是读取每一行的内容,把内容存放到第一个参数value中,返回值表示读取的字节数。从这里可以看到,类LineRecordReader的属性key表示InputSplit中读取的字节位置,value表示读取的文本行的内容。看一下图5-24图5-24在图5-24中,方法getCurrentKey()返回的是key的值,方法getCurrentValue()返回的是value的值。综合以上的分析来看,该类中的getCurrentKeyValue()会被不断的调用,每次被调用后,会同时调用getCurrentKey()和getCurrentValue()。5.5.输出格式化类tputFormat

该类是对类FileSystem操作执行输出的,会对运算的结果先写入到一个临时文件夹中,待运算结束后,再移动到最终的输出目录中。那么,输出的内容具体是什么格式?这是由TextOutputFormat类负责的。tputFormat该类专门输出普通文本文件的,如图5-25图5-25在图5-25中,文本输出的时候使用UTF-8编码,次第47行的代码可以看出,划分行的符号是“n”。从第65行的构造方法可以看出,输出的键值对的默认分隔符是制表符“t”。由此不难理解,为什么输出文件中是一行行的内容,为什么键值对使用制表符分隔了。5.6.本章小结5.7.思考题


发布者:admin,转转请注明出处:http://www.yc00.com/news/1705221604a1399983.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信