2023年7月10日发(作者:)
pythonhadoop教程_PythonAPI操作Hadoophdfs详解1:安装由于是windows环境(linux其实也⼀样),只要有pip或者setup_install安装起来都是很⽅便的pip install hdfs2:Client——创建集群连接from hdfs import *其他参数说明:(url, root=None, proxy=None, timeout=None, session=None)url:ip:端⼝root:制定的hdfs根⽬录proxy:制定登陆的⽤户⾝份timeout:设置的超时时间(“/”)[u’home’,u’input’, u’output’, u’tmp’]3:dir——查看⽀持的⽅法dir(client)4:status——获取路径的具体信息其他参数:status(hdfs_path, strict=True)hdfs_path:就是hdfs路径strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None5:list——获取指定路径的⼦⽬录信息(“/”)[u’home’,u’input’, u’output’, u’tmp’]其他参数:list(hdfs_path, status=False)status:为True时,也返回⼦⽬录的状态信息,默认为Flase6:makedirs——创建⽬录rs(“/123”)其他参数:makedirs(hdfs_path, permission=None)permission:设置权限rs(“/test”,permission=777)7: rename—重命名(“/123″,”/test”)8:delete—删除(“/test”)其他参数:delete(hdfs_path, recursive=False)recursive:删除⽂件和其⼦⽬录,设置为False如果不存在,则会抛出异常,默认为False9:upload——上传数据(“/test”,”F:[PPT]Google Protocol ”);其他参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,chunk_size=65536,progress=None, cleanup=True, **kwargs)overwrite:是否是覆盖性上传⽂件n_threads:启动的线程数⽬temp_dir:当overwrite=true时,远程⽂件⼀旦存在,则会在上传完之后进⾏交换chunk_size:⽂件上传的⼤⼩区间progress:回调函数来跟踪进度,为每⼀chunk_size字节。它将传递两个参数,⽂件上传的路径和传输的字节数。⼀旦完成,-1将作为第⼆个参数cleanup:如果在上传任何⽂件时发⽣错误,则删除该⽂件10:download——下载ad(“/test/”,”/home”)11:read——读取⽂件(“/test/[PPT]Google Protocol ”) as reader:print ()其他参数:read(*args, **kwds)hdfs_path:hdfs路径offset:设置开始的字节位置length:读取的长度(字节为单位)buffer_size:⽤于传输数据的字节的缓冲区的⼤⼩。默认值设置在HDFS配置。encoding:制定编码chunk_size:如果设置为正数,上下⽂管理器将返回⼀个发⽣器产⽣的每⼀chunk_size字节⽽不是⼀个类似⽂件的对象delimiter:如果设置,上下⽂管理器将返回⼀个发⽣器产⽣每次遇到分隔符。此参数要求指定的编码。progress:回调函数来跟踪进度,为每⼀chunk_size字节(不可⽤,如果块⼤⼩不是指定)。它将传递两个参数,⽂件上传的路径和传输的字节数。称为⼀次与- 1作为第⼆个参数。问题:ror: Permission denied: user=, access=WRITE, inode=”/test”:root:supergroup:drwxr-xr-x解决办法是:在配置⽂件中加⼊-:#!/usr/local/bin/pythonimport sysfor line in :ss = ().split(' ')for s in ss:if ()!= "":print "%st%s"% (s, 1):#!/usr/local/bin/pythonimport syscurrent_word = Nonecount_pool = []sum = 0for line in :word, val = ().split('t')if current_word== None:current_word = wordif current_word!= word:for count in count_pool:sum += countprint "%st%s"% (current_word, sum)current_word = wordcount_pool = []sum = 0count_(int(val))for count in count_pool:sum += countprint "%st%s"% (current_word, str(sum)):HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop"STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/"INPUT_FILE_PATH_1="/The_Man_of_"OUTPUT_PATH="/output"$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH# Step 1.$HADOOP_CMD jar$STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper"python " -reducer "" -file ./ -file ./⽬的:通过python模拟mr,计算每年的最⾼⽓温。1. 查看数据⽂件,需要截取年份和⽓温,⽣成key-value对。[tianyc@TeletekHbase python]$ 9999999N9+9999999N9+0500001N9+0500001N9+2. 编写map,打印key-value对[tianyc@TeletekHbase python]$ cat rt reimport sysfor line in :val=()(year,temp)=(val[15:19],val[40:45])print "%st%s" % (year,temp)[tianyc@TeletekHbase python]$ cat |python 1950 +00001950 +00221950 -00111949 +01111949 +00783. 将结果排序[tianyc@TeletekHbase python]$ cat |python |sort1949 +00781949 +01111950 +00001950 -00111950 +00224. 编写redurce,对map中间结果进⾏处理,⽣成最终结果[tianyc@TeletekHbase python]$ cat rt sys(last_key,max_val)=(None,0)for line in :(key,val)=().split('t')if last_key and last_key!=key:print '%st%s' % (last_key, max_val)(last_key, max_val)=(key,int(val))else:(last_key, max_val)=(key,max(max_val,int(val)))if last_key:print '%st%s' % (last_key, max_val)5. 执⾏。[tianyc@TeletekHbase python]$ cat |python |sort|python 1949 1111950 22使⽤python语⾔进⾏MapReduce程序开发主要分为两个步骤,⼀是编写程序,⼆是⽤Hadoop Streaming命令提交任务。还是以词频统计为例⼀、程序开发1、Mapperfor line in :filelds = (' ')for item in fileds:print item+' '+'1'2、Reducerimport sysresult={}for line in :kvs = ().split(' ')k = kvs[0]v = kvs[1]if k in result:result[k]+=1else:result[k] = 1for k,v in ():print k+' '+v....写完发现其实只⽤map就可以处理了…reduce只⽤cat就好了3、运⾏脚本1)Streaming简介Hadoop的MapReduce和HDFS均采⽤Java进⾏实现,默认提供Java编程接⼝,⽤户通过这些编程接⼝,可以定义map、reduce函数等等。但是如果希望使⽤其他语⾔编写map、reduce函数怎么办呢?Hadoop提供了⼀个框架Streaming,Streaming的原理是⽤Java实现⼀个包装⽤户程序的MapReduce程序,该程序负责调⽤hadoop提供的Java编程接⼝。2)运⾏命令/.../bin/hadoop streaming-input /..../input-output /..../output-mapper ""-reducer ""-file -file -D ="wordcount"-D = "1"3)Streaming常⽤命令(1)-input(2)-output(3)-mapper:指定mapper可执⾏程序或Java类,必须指定且唯⼀。(4)-reducer:指定reducer可执⾏程序或Java类,必须指定且唯⼀。(5)-file, -cacheFile, -cacheArchive:分别⽤于向计算节点分发本地⽂件、HDFS⽂件和HDFS压缩⽂件,具体使⽤⽅法参考⽂件分发与打包。(6)numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。(7)-jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考。-jobconf =’My Job Name’设置作业名-jobconf ty=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级-jobconf ty=M设置同时最多运⾏M个map任务-jobconf ty=N设置同时最多运⾏N个reduce任务-jobconf 设置map任务个数-jobconf 设置reduce任务个数-jobconf 设置map的输出是否压缩-jobconf 设置map的输出压缩⽅式-jobconf ss 设置reduce的输出是否压缩-jobconf 设置reduce的输出压缩⽅式-jobconf tor 设置map输出分隔符例⼦:-D tor=: 以冒号进⾏分隔-D =2 指定在第⼆个冒号处进⾏分隔,也就是第⼆个冒号之前的作为key,之后的作为value(8)-combiner:指定combiner Java类,对应的Java类⽂件打包成jar⽂件后⽤-file分发。(9)-partitioner:指定partitioner Java类,Streaming提供了⼀些实⽤的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner。(10)-inputformat, -outputformat:指定inputformat和outputformat Java类,⽤于读取输⼊数据和写⼊输出数据,分别要实现InputFormat和OutputFormat接⼝。如果不指定,默认使⽤TextInputFormat和TextOutputFormat。(11)cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。(12)-mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运⾏的debug程序。(13)-verbose:指定输出详细信息,例如分发哪些⽂件,实际作业配置参数值等,可以⽤于调试。以上这篇Python API 操作Hadoop hdfs详解就是⼩编分享给⼤家的全部内容了,希望能给⼤家⼀个参考。
发布者:admin,转转请注明出处:http://www.yc00.com/web/1688930692a184707.html
评论列表(0条)