微信为什么用密码登录不上却可以不用手机号找回微信密码验证码登录,有这种情况吗

shell判断hdfs文件目录是否存在 - 推酷
shell判断hdfs文件目录是否存在
hadoop有提供相应的脚本去验证文件目录是否存在的:
-bash-3.2$ hadoop fs -help
-test -[defsz] &path&: Answer various questions about &path&, with result via exit status.
return 0 if &path& is a directory.
return 0 if &path& exists.
return 0 if &path& is a file.
return 0 if file &path& is greater than zero bytes in size.
return 0 if file &path& is zero bytes in size.
else, return 1.
测试的hdfs目录中:
-bash-3.2$ hadoop fs -ls /user/hive/warehouse/yhd_gmv_month
Found 3 items
drwxr-xr-x
- deploy supergroup
11:15 /user/hive/warehouse/yhd_gmv_month/ds=
drwxr-xr-x
- deploy supergroup
13:02 /user/hive/warehouse/yhd_gmv_month/ds=
drwxr-xr-x
- deploy supergroup
08:09 /user/hive/warehouse/yhd_gmv_month/ds=
检验昨天产生的目录是否产生的shell脚本:
yesterday=$(date -d '-1 day' '+%Y-%m-%d')
hadoop fs -test -e /user/hive/warehouse/yhd_gmv_month/ds=$yesterday
if [ $? -eq 0 ] ;then
echo 'exist'
echo 'Error! Directory is not exist'
验证存在的输出结果如下:
-bash-3.2$ hadoop fs -test -e /user/hive/warehouse/yhd_gmv_month/ds=$yesterday
if [ $? -eq 0 ] ;then
echo 'exist'
echo 'Error! Directory is not exist Or Zero bytes in size'
-bash-3.2$ if [ $? -eq 0 ] ;then
& echo 'exist'
& echo 'Error! Directory is not exist Or Zero bytes in size'
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致你的浏览器禁用了JavaScript, 请开启后刷新浏览器获得更好的体验!
问题对人有帮助,内容完整,我也想知道答案
已知在hdfs某目录下(如hdfs://tmp/englishnovels)有上百部英文小说(txt后缀),想把小说的名字都读取出来。请赐教!
重要提示:提问者不能发表回复,可以通过评论与回答者沟通,沟通后可以通过编辑功能完善问题描述,以便后续其他人能够更容易理解问题.
答案对人有帮助,有参考价值
通过程序读取么?如果是Java的话,可以通过FileSystem的listStatus()方法
当你传入的参数是目录时,返回的FileStatus的对象列表,里面就包含文件路径。
如果还需要文件过滤,可以使用PathFilter对象,设置下过滤规则。
要回复问题请先或
关注: 2 人flume-ng如何根据源文件名输出到HDFS文件名 - 推酷
flume-ng如何根据源文件名输出到HDFS文件名
flume-ng如何根据源文件名输出到HDFS文件名
需求:源中不同的文件,格式和内容不一样,希望采集到hdfs中后,能有对应的文件名,方便后续分析。
flume-ng可以自定义header,所以可以通过header来传递一些变量。而旧版的flume则可能不得不通过逻辑节点来部署不同的端口来曲线完成一些变量约定。
自定义header的方法,至少有5种。
编辑一个header文件,每一行用key=value的形式。在执行flume客户端时用–headerFile header指定header文件。
interceptors的static类型可以指定key,value,随header传递。
编程实现,在header里面添加相应的key/value
如果自己实现Interceptor,可以做类似的动作:
public class LogFormatInterceptor implements Interceptor {
headers = event.getHeaders();
headers.put( “host”, hostname );
参考:http://blog.csdn.net/rjhym/article/details/8450728
flume已有的,如时间。见http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
采用json,header里面可以指定
configure文件:
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 9000
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1}
a1.sinks.k1.hdfs.filePrefix = events-
curl -X POST -d ‘[{ &headers& : { &timestamp& : &&, &host& :&random_&, &field1& : &val1& }, &body& : &random_body& }]‘ localhost:9000
本文采用1,2,4 这三种方案,来实现hdfs文件的自定义和可传输。
[zhouhh@Hadoop48 flume1.3.1]$ cat header
filename=game.log
[zhouhh@Hadoop48 flume1.3.1]$ cat conf/testhdfs.conf
syslog-agent.sources = Syslog gamelog
syslog-agent.channels = MemoryChannel-1
syslog-agent.sinks = HDFS-LAB
#syslog-agent.sources.Syslog.type = syslogTcp
syslog-agent.sources.Syslog.type = avro
syslog-agent.sources.Syslog.port = 5140
syslog-agent.sources.Syslog.bind= 0.0.0.0
syslog-agent.sources.Syslog.host= hadoop48
syslog-agent.sources.Syslog.interceptors = i1 i2 i3
syslog-agent.sources.Syslog.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
syslog-agent.sources.Syslog.interceptors.i1.preserveExisting = false
syslog-agent.sources.Syslog.interceptors.i1.hostHeader = hostname
syslog-agent.sources.Syslog.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
syslog-agent.sources.Syslog.interceptors.i3.type = static
syslog-agent.sources.Syslog.interceptors.i3.key = datacenter
syslog-agent.sources.Syslog.interceptors.i3.value = NEW_YORK
syslog-agent.sources.Syslog.channels = MemoryChannel-1
syslog-agent.sources.gamelog.type = syslogTcp
syslog-agent.sources.gamelog.port = 5150
syslog-agent.sources.gamelog.channels = MemoryChannel-1
syslog-agent.sinks.HDFS-LAB.channel = MemoryChannel-1
syslog-agent.sinks.HDFS-LAB.type = hdfs
syslog-agent.sinks.HDFS-LAB.hdfs.path = hdfs://Hadoop48:54310/flume/%{host}
#syslog-agent.sinks.HDFS-LAB.hdfs.filePrefix = Syslog.%{host}
syslog-agent.sinks.HDFS-LAB.hdfs.filePrefix = %{filename}.%{host}.%Y-%m-%d
syslog-agent.sinks.HDFS-LAB.hdfs.rollInterval = 60
#syslog-agent.sinks.HDFS-LAB.hdfs.fileType = SequenceFile
syslog-agent.sinks.HDFS-LAB.hdfs.fileType = DataStream
#syslog-agent.sinks.HDFS-LAB.hdfs.file.writeFormat= Text
syslog-agent.channels.MemoryChannel-1.type = memory
[zhouhh@Hadoop48 flume1.3.1]$ flume-ng agent -n syslog-agent -f testhdfs.conf
13/02/19 18:06:50 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..3.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..3
13/02/19 18:06:50 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..4.tmp
13/02/19 18:06:50 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..4.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..4
13/02/19 18:06:50 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..5.tmp
13/02/19 18:06:50 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..5.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..5
13/02/19 18:06:50 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..6.tmp
13/02/19 18:07:50 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..6.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..6
[zhouhh@Hadoop48 flume1.3.1]$ fs -cat hdfs://Hadoop48:54310/flume/ha48/game.log.ha48..6
gdm:x:42:42::/var/gdm:/sbin/nologin
sabayon:x:86:86:Sabayon user:/home/sabayon:/sbin/nologin
hbase:x:500:500::/home/hbase:/bin/bash
zhh:x:501:501::/home/zhh:/bin/bash
[zhouhh@Hadoop48 flume1.3.1]$ vi conf/testhdfs.conf
syslog-agent.sinks.HDFS-LAB.hdfs.filePrefix = %{filename}.%{host}.%{datacenter}.%Y-%m-%d
[zhouhh@Hadoop48 flume1.3.1]$ flume-ng agent -n syslog-agent -f testhdfs.conf
13/02/19 18:12:13 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..2.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..2
13/02/19 18:12:13 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..3.tmp
13/02/19 18:12:13 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..3.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..3
13/02/19 18:12:13 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..4.tmp
13/02/19 18:12:13 INFO hdfs.BucketWriter: Renaming hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..4.tmp to hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..4
13/02/19 18:12:13 INFO hdfs.BucketWriter: Creating hdfs://Hadoop48:54310/flume/ha48/game.log.ha48.NEW_YORK..5.tmp
这就实现了可以定制来源的hdfs输出文件名。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致HDFS的文件操作流
阅读:1128次&&&时间: 13:12:57&&
大家可能对本地文件系统中的文件I/O流已经是非常的熟悉了,那么,像HDFS这种分布式的文件I/O流——基于网络I/O流的数据流,又是如何实现的呢?这就是本文的重点之一:HDFS的文件写入流。
&&&& 熟悉HDFS的人可能知道,当我们调用DistributedFileSystem的create方法时,将会返回一个FSDataOutputStream对象,通过这个对象来对文件进行数据的写入。还是贴一张该对象的类图吧!
现在,我们以调用FSDataOutputStream的write(byte[],int,int)为例来看看HDFS是如何完成文件的数据写入操作的。先see一下这个过程的序列图:
从上图中,我们可以很容易的看出,文件的写入操作,关键是是要看DFSOutputSream,所以,下面我将详细的讨论DFSOutputSream类。
&&&& 当创建一个DFSOutputSream实例的时候,它会首先根据设置的一个packet的大小和一个校验块的大小来计算一个packet应该包含多少个校验块(数据块+加校验和)以及这个packet的实际大小。源代码如下:
其中,原始的packet的大小来自配置文件中参数dfs.write.packet.size的设置,校验数据块的大小来自配置文件中参数io.bytes.per.checksum的设置。然后调用ClientProtocol的create远程方法,最后启动线程DataStreamer。整个创建过程的源代码如下:
现在,一切问题就都纠结在DataStream到底是个啥东东,究竟干了些神马勾当才能和DataNode节点联系起来呢?
&&&& 实际上,在DataStream中,它总是不停的从packet队列中取出待发送的packet给DataNode节点,当然在这个过程中,它要不断地向NameNode节点申请Blocks,即:当没有Block或申请的一个Block已满时,它会调用ClientProtocol的addBlock远程方法得到一个LocatedBlock,也就是要知道它应该要把这个Block的packet发送到那些DataNode节点上。当然,HDFS对于Block的副本copy采用的是流水线作业的方式:client把数据Block只传给一个DataNode,这个DataNode收到Block之后,传给下一个DataNode,依次类推,...,最后一个DataNode就不需要下传数据Block了。噢,当DataStream把一个Block的所有Packet传送完毕之后,必须要等待所有的Packet被ack之后才能重新申请新的Block来传送后面的packet。为么么知道,在HDFS中文件是分块存储的,每一个块还有多个备份,同时不同的块的备份被存在不同的机器上,而且,这些组成文件的块也放在不同的数据节点上,那么,HDFS是如何实现文件的读取呢?比如:当客户端准备读取某一个文件的一个数据块时,若这个数据块有多个副本,那么这个客户端应该读取来个副本呢?
&&&& 在上一篇文章中,我讲述了有关HDFS中文件写入流的实现方式与操作过程,所以在本文,我将重点分析HDFS中文件的读取过程。通过对这种分布式文件流的工作过程进行详细的阐述,来让大家能够清楚的了解HDFS是如何实现文件的读取的。
&&&&&&& 用过HDFS的API的人都知道,我们要读取一个文件,需要调用DistributedFileSystem的open(Path,int)方法,该方法会返回一个数据流对象——FSDataInputStream,它实际是自己的子类DFSDataInputStream。好了,我们先来看看它们的类的继承体系结构图吧,以便能够更好地认识它们。
既然DistributedFileSystem的open方法返回的是FSDataInputStream类型的对象,那么我就不打算讲解DFSDataInputStream中的三个方法了,而是将以FSDataInputStream中关于文件读取操作的方法为例展开,通过对它们的处理过程进行详细的阐述,来逐步剖析HDFS读取文件的奥秘。
&&&& 我根据这个数据流的创建创建过程,绘制与之相关的序列图:
从上面的序列图,我们可以发现在创建底层的DFSInputStream时,调用了自身的openInfo()方法,该内部方法主要通过ClientProtocol协议调用远程方法getBlockLocations(),从NameNode获取关于当前打开的文件的前若干个数据块Blocks位置信息,代码如下:
其中,prefetchSize的默认值是十个数据块的大小,当然也可以在配置文件中通过dfs.read.prefect.size的值来设置。
&&&& 其实,FSDataInputStream的read(long,byte[],int,int)和read(byte[],int,int)方法的底层实现是一样的,只不过前者是实现了随机读,后者是顺序读。另外,read(long,byte[],int,int)方法虽然是随机读,当在执行完之后,并不会影响当前文件的读指针,为了让大家有一个直观的认识,我还是贴上一幅该操作的序列图(这里以read(long,byte[],int,int)为例):
对于数据流底层的read(byte[],int,int)方法的实现如下:
public synchronized int read(byte buf[], int off, int len) throws IOException {&&&&& checkOpen();&&&&& if (closed) {&&&&&&& throw new IOException("Stream closed");&&&&& }&&&&& failures = 0;
&&&&& //检查当前文件指针是否超过文件末尾&&&&& if (pos & getFileLength()) {&&&&&&& int retries = 2;&&&&&&& while (retries & 0) {&&&&&&&&& try {
&&&&&&&&&&& if (pos & blockEnd) {//如果当前文件指针不在当前数据块,则定位到当前文件指针所在的数据块&&&&&&&&&& &&& &LOG.debug("the current file pos is not at current block,so start to seek the right block.");&&&&&&&&&&&&& currentNode = blockSeekTo(pos);&&&&&&&&&&& }&&&&&&&&&&& int realLen = Math.min(len, (int) (blockEnd - pos + 1));//计算真正能够读取的数据长度
&&&&&&&&&&& int result = readBuffer(buf, off, realLen);//读文件&&&&&&&&&& &&&&&&&&&&&& if (result &= 0) {&&&&&&&&&&&&& pos +=&&&&&&&&&&& } else {&&&&&&&&&&&&& // got a EOS from reader though we expect more data on it.&&&&&&&&&&&&& throw new IOException("Unexpected EOS from the reader");&&&&&&&&&&& }&&&&&&&&&&& if (stats != null && result != -1) {&&&&&&&&&&&&& stats.incrementBytesRead(result);&&&&&&&&&&& }&&&&&&&&&&&&&&&&&&&& } catch (ChecksumException ce) {&&&&&&&&&&&&&&&&&&&&& &&&&&&&&&& } catch (IOException e) {&&&&&&&&&&& if (retries == 1) {&&&&&&&&&&&&& LOG.warn("DFS Read: " + StringUtils.stringifyException(e));&&&&&&&&&&& }&&&&&&&&&&& blockEnd = -1;&&&&&&&&&&& if (currentNode != null) { addToDeadNodes(currentNode); }&&&&&&&&&&& if (--retries == 0) {&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&& }&&&&&&& }&&&&& }&&&&& return -1;&&& }
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {&&&&& if (target &= getFileLength()) {&&&&&&& throw new IOException("Attempted to read past end of file");&&&&& }&&&&& if ( blockReader != null ) {&&&&&&& blockReader.close(); &&&&&&& blockReader =&&&&& }&&&& &&&&&& if (s != null) {&&&&&&& s.close();&&&&&&& s =&&&&& }&&&&& //获取文件位置target所在的数据块的位置信息&&&&& LocatedBlock targetBlock = getBlockAt(target);&&&&& assert (target==this.pos) : "Wrong postion " + pos + " expect " +
&&&&& long offsetIntoBlock = target - targetBlock.getStartOffset();//计算文件位置target在该数据块中的起始位置&&&&& DatanodeInfo chosenNode =&&&&& while (s == null) {
&&&&&&& //从数据块的副本中选取一个副本,即该数据块所在的一个数据节点信息
&&&&&&& DNAddrPair retval = chooseDataNode(targetBlock);&&&&&& &&&&&&&& chosenNode = ;&&&&&&& InetSocketAddress targetAddr = retval.&&&&&&& try {&&&&&&&&& s = socketFactory.createSocket();&&&&&&&&& LOG.debug("start to connect to datanode: "+targetAddr);&&&&&&&&& NetUtils.connect(s, targetAddr, socketTimeout);&&&&&&&&& s.setSoTimeout(socketTimeout);&&&&&&&&& Block blk = targetBlock.getBlock();&&&&&&&& &&&&&&&&&& LOG.debug("create a BlockReader for Block["+blk.getBlockId()+"] of file["+src+"].");
&&&&&&&&& //创建一个数据块的读取器
&&&&&&&&& blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),blk.getGenerationStamp(),offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, clientName);&&&&&&&& &&&&&&&&&& return chosenN&&&&&&&& &&&&&&&& } catch (IOException ex) {&&&&&&&&& // Put chosen node into dead list, continue&&&&&&&&& LOG.debug("Failed to connect to " + targetAddr + ":" &&&&&&&&&&&&&&&&&&& + StringUtils.stringifyException(ex));&&&&&&&&& addToDeadNodes(chosenNode);&&&&&&&&& if (s != null) {&&&&&&&&&&& try {&&&&&&&&&&&&& s.close();&&&&&&&&&&& } catch (IOException iex) {&&&&&&&&&&& }&&&&&&&&&&&&&&&&&&&&&& &&&&&&&&&& }&&&&&&&&& s =&&&&&&& }&&&&& }&&&&& return chosenN&&& }private synchronized int readBuffer(byte buf[], int off, int len)throws IOException {&&&&& IOE&&&&& boolean retryCurrentNode =&&&&&& while (true) {&&&&&&& // retry as many times as seekToNewSource allows.&&&&&&& try {&&&&&&&&& return blockReader.read(buf, off, len);//从数据块读取器中读取len个字节的数据&&&&&&& } catch ( ChecksumException ce ) {&&&&&&&&& LOG.warn("Found Checksum error for " + currentBlock + " from " +&&&&&&&&&&&&&&&&&& currentNode.getName() + " at " + ce.getPos());&&&&&&&& &&&&&&&&&& reportChecksumFailure(src, currentBlock, currentNode);&&&&&&&&& ioe =&&&&&&&&& retryCurrentNode =&&&&&&& } catch ( IOException e ) {&&&&&&&&& if (!retryCurrentNode) {&&&&&&&&&&& LOG.warn("Exception while reading from " + currentBlock +&&&&&&&&&&&&&&&&&&&& " of " + src + " from " + currentNode + ": " +&&&&&&&&&&&&&&&&&&&& StringUtils.stringifyException(e));&&&&&&&&& }&&&&&&&&& ioe =&&&&&&& }&&&&&&& boolean sourceFound =&&&&&&& if (retryCurrentNode) {&&&&&&&&& sourceFound = seekToBlockSource(pos);&&&&&&& } else {&&&&&&&&& addToDeadNodes(currentNode);&&&&&&&&& sourceFound = seekToNewSource(pos);&&&&&&& }&&&&&&& if (!sourceFound) {&&&&&&&&&&&&&&&& }&&&&&&& retryCurrentNode =&&&&& }&&& }
private LocatedBlock getBlockAt(long offset) throws IOException {&&&&& assert (locatedBlocks != null) : "locatedBlocks is null";&&&& &&&&&& // 从缓存中查找文件位置offset所在的数据块地址信息&&&&& int targetBlockIdx = locatedBlocks.findBlock(offset);&&&& &&&&&& if (targetBlockIdx & 0) { //缓存中没有&&&&&&& targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);&&&&&&& // fetch more blocks&&&&&&& LocatedBlocks newB
&&&&&&& //从NameNode获取文件src从offset到offset+prefetchSize的内容所在的数据块的位置信息
&&&&&&& newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);&&&&&&& assert (newBlocks != null) : "Could not find target position " +&&
&&&&&& //将新获取的数据块位置信息加入缓存
&&&&&&& locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());&&&&& }&&&& &&&&&& LocatedBlock blk = locatedBlocks.get(targetBlockIdx);&&&& &&&&&& //更新当前文件指针、数据块信息&&&&& this.pos =&&&&& this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;&&&&& this.currentBlock = blk.getBlock();&&&& &&&&&&&&& }从上面的源代码,我们可以看出,对于FSDataInputStream的read(long,byte[],int,int)和read(byte[],int,int)方法的一次调用中并不会主动地跨数据块读取数据,也就是说,在底层DFSInputStream只会在当前数据块内尽可能读取len个字节的数据(当前数据块有多少数据就读多少数据,知道满足len长度,若当前数据块不够len,也不会跳到下个数据块继续读,而是直接返回)。另外,BlockReader主要是用来接收某一个数据节点发送来的数据块的数据,它的实现很简单,有兴趣的话可以阅读它的源代码。
&&& 再来简单的看看readFully(long,byte[],int,int)和readFully(long,byte[])方法,它们的底层实现都是一样的,都会不断的调用read(long,byte[],int,int)方法,直到读取到len长度的字节。值得注意的是,如果整个文件从position开始没有len长度的数据,就会抛出异常。它们的序列图如下:
&&&&& 到这里,我已经全部介绍完了有关HDFS的I/O流的实现,希望对如何提高HDFS文件读写速度感兴趣的盆友有所帮助。在前面的博文中我主要从客户端的角度讲述了HDFS文件写操作的工作流程,但是关于客户端是如何把数据块传送到数据节点,同时数据节点又是如何来接受来自客户端的数据块呢?这就是本文将要讨论的。
&&&&& 上一次在HDFS的文件操作流(1)——写操作(客户端) (见 &)一文中粗略的提到了DataStreamer线程,那么现在我们就来具体的看看客户端是如何传输数据的。先来看看底层文件写入流DFSOutputSream的核心代码:
&/**&&&& * @param b&&&&&&&&&&&&&&& 要写入文件的数据块&&&& * @param offset&&&&&&&& 开始位置&&&& * @param len&&&&&&&&&&&& 数据长度&&&& * @param checksum& 数据块b[offset~offset+len]的校验码&&&& */
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)& throws IOException {&&&&& checkOpen();&&&&& isClosed();& &&&&& int cklen = checksum.&&&&& int bytesPerChecksum = this.checksum.getBytesPerChecksum(); &&&&& if (len & bytesPerChecksum) {&&&&&&& throw new IOException("writeChunk() buffer size is " + len +&&&&&&&&&&&&&&&&&&&&&&&&&&&&& " is larger than supported& bytesPerChecksum " +&&&&&&&&&&&&&&&&&&&&&&&&&&&&& bytesPerChecksum);&&&&& }&&&&& if (checksum.length != this.checksum.getChecksumSize()) {&&&&&&& throw new IOException("writeChunk() checksum size is supposed to be " +&&&&&&&&&&&&&&&&&&&&&&&&&&&&& this.checksum.getChecksumSize() + &&&&&&&&&&&&&&&&&&&&&&&&&&&&& " but found to be " + checksum.length);&&&&& }&&&&& synchronized (dataQueue) {& &&&&&&& // If queue is full, then wait till we can create& enough space&&&&&&& while (!closed && dataQueue.size() + ackQueue.size()& & maxPackets) {&&&&&&&&& try {&&&&&&&&&&& dataQueue.wait();&&&&&&&&& } catch (InterruptedException& e) {&&&&&&&&& }&&&&&&& }&&&&&&& isClosed();& &&&&&&& if (currentPacket == null) {&&&&&&&&& currentPacket = new Packet(packetSize, chunksPerPacket,bytesCurBlock);&&&&&&& }&&&&&&& currentPacket.writeChecksum(checksum, 0, cklen);&&&&&&& currentPacket.writeData(b, offset, len);&&&&&&& currentPacket.numChunks++;&&&&&&& bytesCurBlock += len;
&&&&&&& if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) {&&&&&&&&& //当前Block是否已满&&&&&&&&& if (bytesCurBlock == blockSize) {&&&&&&&&&&& currentPacket.lastPacketInBlock =&&&&&&&&&&& bytesCurBlock = 0;&&&&&&&&&&& lastFlushOffset = -1;&&&&&&&&& }&&&&&&&& &&&&&&&&&& LOG.debug("the packet is full,so put it to dataQueue");&&&&&&&&& dataQueue.addLast(currentPacket);&&&&&&&&& dataQueue.notifyAll();&&&&&&&&& currentPacket =&&&&&&&&& if (appendChunk) {&&&&&&&&&&& appendChunk =&&&&&&&&&&& resetChecksumChunk(bytesPerChecksum);&&&&&&&&& }&&&&&&&& &&&&&&&&&& int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);&&&&&&&&& computePacketChunkSize(psize, bytesPerChecksum);&&&&&&& }&&&&&& &&&&& }&&& }
从DFSOutputSream的核心函数writeChunk()我们可以看出,DFSOutputSream先把写入的数据缓存到packet中,当packet满了,或者是当前Block满了,则把packet放入队列dataQueue,等待其它的工作者把该packet发送到目标数据节点上。其实,这个工作者就是DataStreamer,它是DFSOutputSream的一个内部线程类,下面就来看看DataStreamer是如何工作的吧!
private class DataStreamer extends Daemon {&&&&& private volatile boolean closed =&&&&& public void run() {&&&&&&& while (!closed && clientRunning) {&&&&&&&&& if (hasError && response != null) {&&&&&&&&&&& try {&&&&&&&&&&&&& response.close();&&&&&&&&&&&&& response.join();&&&&&&&&&&&&& response =&&&&&&&&&&& } catch (InterruptedException& e) {&&&&&&&&&&& }&&&&&&&&& }&&&&&&&&& Packet one =&&&&&&&&& synchronized (dataQueue) {&&&&&&&&&&& //处理I/O错误&&&&&&&&&&& boolean doSleep = processDatanodeError(hasError, false);&&&&&&&&&&& // wait for a packet to be sent.&&&&&&&&&&& while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {&&&&&&&&&&&&& try {&&&&&&&&&&&&&&& dataQueue.wait(1000);&&&&&&&&&&&&& } catch (InterruptedException& e) {&&&&&&&&&&&&& }&&&&&&&&&&&&& doSleep =&&&&&&&&&&& }&&&&&&&&&&& if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&&&& try {&&&&&&&&&&&&& //从队列dataQueue中取出一个将要发送的packet&&&&&&&&&&&&& one = dataQueue.getFirst();&&&&&&&&&&&&& long offsetInBlock = one.offsetInB& &&&&&&&&&&&&& // 当前还没有一个可用的数据块&&&&&&&&&&&&& if (blockStream == null) {&&&&&&&&&&&&&&& //向NameNode节点申请一个数据块Block,同时创建一个blockStream&&&&&&&&&&&&&&& nodes = nextBlockOutputStream(src); &&&&&&&&&&&&&&& this.setName("DataStreamer for file " + src +& " block " + block);&&&&&&&&&&&&&&& response = new ResponseProcessor(nodes);&&&&&&&&&&&&&&& response.start();&&&&&&&&&&&&& }&&&&&&&&&&&&& if (offsetInBlock &= blockSize) {&&&&&&&&&&&&&&& throw new IOException("BlockSize " + blockSize +& " is smaller than data size. " +&& " Offset of packet in block " + & offsetInBlock +& " Aborting file " + src);&&&&&&&&&&&&& }&&&&&&&&&&&&& ByteBuffer buf = one.getBuffer();&&&&&&&&&&&& &&&&&&&&&&&&&& // move packet from dataQueue to ackQueue&&&&&&&&&&&&& dataQueue.removeFirst();&&&&&&&&&&&&& dataQueue.notifyAll();&&&&&&&&&&&&& synchronized (ackQueue) {&&&&&&&&&&&&&&& ackQueue.addLast(one);&&&&&&&&&&&&&&& ackQueue.notifyAll();&&&&&&&&&&&&& } &&&&&&&&&&&& &&&&&&&&&&&&&& // write out data to remote datanode&&&&&&&&&&&&& blockStream.write(buf.array(), buf.position(), buf.remaining());&&&&&&&&&&&&& //一个数据块是否已满了
&&&&&&&&&&&&& if (one.lastPacketInBlock) {&&&&&&&&&&&&&&& blockStream.writeInt(0); // indicate end-of-block &&&&&&&&&&&&& }&&&&&&&&&&&&& blockStream.flush();&&&&&&&&&&& } catch (Throwable e) {&&&&&&&&&&&&& LOG.warn("DataStreamer Exception: " + &&&&&&&&&&&&&&&&&&&&&& StringUtils.stringifyException(e));&&&&&&&&&&&&& if (e instanceof IOException) {&&&&&&&&&&&&&&& setLastException((IOException)e);&&&&&&&&&&&&& }&&&&&&&&&&&&& hasError =&&&&&&&&&&& }&&&&&&&&& }&&&&&&&&& if (closed || hasError || !clientRunning) {&&&&&&&&&&&&&&&&&&&& }&&&&&&&&& // 如果一个Block的所有packet已发送完了,就等到所有来自数据节点的apcket的ack&&&&&&&&& if (one.lastPacketInBlock) {&&&&&&&&&&& synchronized (ackQueue) {&&&&&&&&&&&&& while (!hasError && ackQueue.size() != 0 && clientRunning) {&&&&&&&&&&&&&&& try {&&&&&&&&&&&&&&&&& ackQueue.wait();&& // wait for acks to arrive from datanodes&&&&&&&&&&&&&&& } catch (InterruptedException& e) {&&&&&&&&&&&&&&& }&&&&&&&&&&&&& }&&&&&&&&&&& }&&&&&&&&&&& this.setName("DataStreamer for file " + src);&&&&&&&&&&& response.close();&&&&&&& // ignore all errors in Response&&&&&&&&&&& try {&&&&&&&&&&&&& response.join();&&&&&&&&&&&&& response =&&&&&&&&&&& } catch (InterruptedException& e) {&&&&&&&&&&& }&&&&&&&&&&& if (closed || hasError || !clientRunning) {&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&&&& synchronized (dataQueue) {&&&&&&&&&&&&& try {&&&&&&&&&&&&&&& blockStream.close();&&&&&&&&&&&&&&& blockReplyStream.close();&&&&&&&&&&&&& } catch (IOException e) {&&&&&&&&&&&&& }&&&&&&&&&&&&& nodes =&&&&&&&&&&&&& response =&&&&&&&&&&&&& blockStream =&&&&&&&&&&&&& blockReplyStream =&&&&&&&&&&& }&&&&&&&&& }&&&&&&&&& if (progress != null) { progress.progress(); }&&&&&&&&& // This is used by unit test to trigger race conditions.&&&&&&&&& if (artificialSlowdown != 0 && clientRunning) {&&&&&&&&&&& try { &&&&&&&&&&&&& Thread.sleep(artificialSlowdown); &&&&&&&&&&& } catch (InterruptedException e) {}&&&&&&&&& }&&&&&&& }&&&&& }&&&&& // shutdown thread&&&&& void close() {&&&&&&& closed =&&&&&&& synchronized (dataQueue) {&&&&&&&&& dataQueue.notifyAll();&&&&&&& }&&&&&&& synchronized (ackQueue) {&&&&&&&&& ackQueue.notifyAll();&&&&&&& }&&&&&&& this.interrupt();&&&&& }&&& }
&&&& 上面的代码值得让我们注意的是,在Hadoop的官网上有关于介绍HDFS的一句话:A client request to create a file does not reach the NameNode immediately. In fact, initially the HDFS client caches the file data into a temporary local file. Application writes are transparently redirected to this temporary local file. When the local file accumulates data worth over one HDFS block size, the client contacts the NameNode. 翻译这句话,我就在这里不献丑了。很多分析过源代码的朋友都认为这句话说得有问题,但是我想说的,这就话在本质上是没有问题的,因为DataStreamer总是一个数据块接着一个数据块向目标数据节点发送,也就是对于已经向某一个数据节点发送了一个Block后,DataStreamer并不是马上发送下一个Block,而是要等到packet得到确认后才发送下一个Block,假设当一个用户调用HDFS的API写入了2个Block的数据。此时DataStreamer还在等待第一个Block的所有packet的ack,那么用户的第2个Block的数据还缓存在dataQueue中,同时DataStreamer也没有向NameNode申请第二个Block。那么现在大家再来体会一下刚才那句话。是不是还有点意思呢?另外,用户不能一味的发送数据,负责缓存扛不住,所有就有一个限制了,也就是总的缓存数据不能超过maxPackets个packet,这个值视运行环境而定,目前默认是80或者是1000。ok,再来看看nextBlockOutputStream函数到底为数据块向数据节点传送到底干了那些工作。
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {&&&&& LocatedBlock lb =&&&&& boolean retry =&&&&& DatanodeInfo[]&&&&& int count = conf.getInt("dfs.client.block.write.retries", 3);&&&&&&&&&& do {&&&&&&& hasError =&&&&&&& lastException =&&&&&&& errorIndex = 0;&&&&&&& retry =&&&&&&& nodes =&&&&&&& success =&&&&&&&&&&&&&& &&&&&&&& long startTime = System.currentTimeMillis();
&&&&&&& //向NameNode申请一个新的block
&&&&&&& lb = locateFollowingBlock(startTime);&&&&&&& block = lb.getBlock();&&&&&&& nodes = lb.getLocations();&&&&&& &&&&&&&& LOG.debug("locate a block["+block.getBlockId()+"] for file["+src+"]: "+nodes);& &&&&&&& //创建一个和数据节点的网络connection&&&&&&& success = createBlockOutputStream(nodes, clientName, false);&&&&&&& if (!success) {
&&&&&&&& //向NameNode放弃文件src的一个block
&&&&&&&&& namenode.abandonBlock(block, src, clientName);&&&&&&&&& // Connection failed.& Let's wait a little bit and retry&&&&&&&&& retry =&&&&&&&&& try {&&&&&&&&&&& if (System.currentTimeMillis() - startTime & 5000) {&&&&&&&&&&&&& ("Waiting to find target node: " + nodes[0].getName());&&&&&&&&&&& }&&&&&&&&&&& Thread.sleep(6000);&&&&&&&&& } catch (InterruptedException iex) {&&&&&&&&& }&&&&&&& }&&&&& } while (retry && --count &= 0);&&&&& if (!success) {&&&&&&& throw new IOException("Unable to create new block.");&&&&& }&&&&&&&& }private LocatedBlock locateFollowingBlock(long start) throws IOException {&&& &&&&&& int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);&&&&& long sleeptime = 400;&&&&& while (true) {&&&&&&& long localstart = System.currentTimeMillis();&&&&&&& while (true) {&&&&&&&&& try {
&&&&&&&&&&& //调用NameNode节点的远程方法addBlock来为文件src申请一个Block
&&&&&&&&&&& return namenode.addBlock(src, clientName);&&&&&&&&& } catch (RemoteException e) {&&&&&&&&&&& IOException ue = &&&&&&&&&&&&& e.unwrapRemoteException(FileNotFoundException.class,&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& AccessControlException.class,&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& NSQuotaExceededException.class,&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& DSQuotaExceededException.class);&&&&&&&&&&& if (ue != e) { &&&&&&&&&&&&& // no need to retry these exceptions&&&&&&&&&&& }&&&&&&&&&& &&&&&&&&&&&& if (NotReplicatedYetException.class.getName().&&&&&&&&&&&&&&& equals(e.getClassName())) {&&&&&&&&&&&&&&& if (retries == 0) { &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& } else {&&&&&&&&&&&&&&&&& --&&&&&&&&&&&&&&&&& try {&&&&&&&&&&&&&&&&&&& Thread.sleep(sleeptime);&&&&&&&&&&&&&&&&&&& sleeptime *= 2;&&&&&&&&&&&&&&&&& } catch (InterruptedException ie) {&&&&&&&&&&&&&&&&& }&&&&&&&&&&&&&&& }&&&&&&&&&&& } else {&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&& }&&&&&&& }&&&&& } &&& }private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client, boolean recoveryFlag) {&&&&& String firstBadLink = "";&&&&& persistBlocks =&&&&& try {&&&&&&& InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());&&&&&&& s = socketFactory.createSocket();&&&&&&& int timeoutValue = 3000 * nodes.length + socketT&&&&&&& NetUtils.connect(s, target, timeoutValue);&&&&&&& s.setSoTimeout(timeoutValue);&&&&&&& s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);&&&&&&& LOG.debug("Send buf size " + s.getSendBufferSize());&&&&&& &&&&&&&& long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length + datanodeWriteT&&&&&&& DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE));
&&&&&&& //由确认线程ResponseProcessor使用,获取数据节点对发送的packet的确认包&&&&&&& blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));&&&&&&& out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );//数据传输协议的版本号&&&&&&& out.write( DataTransferProtocol.OP_WRITE_BLOCK );//数据节点应该执行的操作&&&&&&& out.writeLong( block.getBlockId() );//Block的id号&&&&&&& out.writeLong( block.getGenerationStamp() );//Block创建的时间&&&&&&& out.writeInt( nodes.length );//Block所有副本数量&&&&&&& out.writeBoolean( recoveryFlag );&&&&&& // 是否是恢复一个Block&&&&&&& Text.writeString( out, client );//客户端名字&&&&&&& out.writeBoolean(false); // Not sending src node information&&&&&&& out.writeInt( nodes.length - 1 );//剩余副本数量&&&&&&& for (int i = 1; i & nodes. i++) {&&&&&&&&& nodes[i].write(out);//存放剩余副本的数据节点信息&&&&&&& }&&&&&&& checksum.writeHeader( out );//数据校验信息&&&&&&& out.flush();&&&&&&& // receive ack for connect&&&&&&& firstBadLink = Text.readString(blockReplyStream);&&&&&&& if (firstBadLink.length() != 0) {&&&&&&&&& throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);&&&&&&& }&&&&&&& blockStream =&&&&&&&&&&& // success&&&&& } catch (IOException ie) {&&&&&&& if (firstBadLink.length() != 0) {&&&&&&&&& for (int i = 0; i & nodes. i++) {&&&&&&&&&&& if (nodes[i].getName().equals(firstBadLink)) {&&&&&&&&&&&&& errorIndex =&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&& }&&&&&&& }&&&&&&& hasError =&&&&&&& setLastException(ie);&&&&&&& blockReplyStream =&&&&&&&& // error&&&&& }&&& }&&& 对于客户端向数据节点传送过程中,难免会发生错误,这些错误包括,客户端向第一个数据节点写数据时发生网络错误,数据节点向数据节点写数据时发生错误,从数据节点获取packet的确认信息是发生错误等,它们都统一交给DFSOutputSream中的函数processDatanodeError来处理的。
private boolean processDatanodeError(boolean hasError, boolean isAppend) {&&&&& if (!hasError) {&&&&&&&&&&&& }&&&&& if (response != null) {&&&&&&&&&&&& }&&& &&&&& if (blockStream != null) {&&&&&&& try {&&&&&&&&& blockStream.close();&&&&&&&&& blockReplyStream.close();&&&&&&& } catch (IOException e) {&&&&&&& }&&&&& }&&&&& blockStream =&&&&& blockReplyStream =&&&&& // 将未被确认的数据包重新放到dataQueue中,并清空ackQueue
&&&&& synchronized (ackQueue) {&&&&&&& dataQueue.addAll(0, ackQueue);&&&&&&& ackQueue.clear();&&&&& }&&&&& boolean success =&&&&& while (!success && clientRunning) {&&&&&&& DatanodeInfo[] newnodes =&&&&&&& if (nodes == null) {&&&&&&&&& String msg = "Could not get block locations. " + "Source file "" + src + "" - Aborting...";&&&&&&&&& LOG.warn(msg);&&&&&&&&& setLastException(new IOException(msg));&&&&&&&&& closed =&&&&&&&&& if (streamer != null) streamer.close();&&&&&&&&&&&&&&&& }&&&&&&& StringBuilder pipelineMsg = new StringBuilder();&&&&&&& for (int j = 0; j & nodes. j++) {&&&&&&&&& pipelineMsg.append(nodes[j].getName());&&&&&&&&& if (j & nodes.length - 1) {&&&&&&&&&&& pipelineMsg.append(", ");&&&&&&&&& }&&&&&&& }&&&&&&& if (errorIndex & 0) {&&&&&&&&& newnodes =&&&&&&& } else {&&&&&&&&& if (nodes.length &= 1) {&&&&&&&&&&& lastException = new IOException("All datanodes " + pipelineMsg + & " are bad. Aborting...");&&&&&&&&&&& closed =&&&&&&&&&&& if (streamer != null) streamer.close();&&&&&&&&&&&&&&&&&&&& }&&&&&&&&& //删除出问题的数据节点
&&&&&&&&& newnodes =& new DatanodeInfo[nodes.length-1];&&&&&&&&& System.arraycopy(nodes, 0, newnodes, 0, errorIndex);&&&&&&&&& System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, newnodes.length-errorIndex);&&&&&&& }
&&&&&&& LocatedBlock newBlock =&&&&&&& ClientDatanodeProtocol primary =&&&&&&&& DatanodeInfo primaryNode =&&&&&&& try {&&&&&&&&& //从剩余可用的数据节点中先一个节点来恢复Block&&&&&&&&& primaryNode = Collections.min(Arrays.asList(newnodes));&&&&&&&&& primary = createClientDatanodeProtocolProxy(primaryNode, conf);&&&&&&&&& newBlock = primary.recoverBlock(block, isAppend, newnodes);&&&&&&& } catch (IOException e) {&&&&&&&&& recoveryErrorCount++;&&&&&&&&& if (recoveryErrorCount & maxRecoveryErrorCount) {&&&&&&&&&&& if (nodes.length & 1) {&&&&&&&&&&&&& for (int j = 0; j & nodes. j++) {&&&&&&&&&&&&&&& if (nodes[j].equals(primaryNode)) {&&&&&&&&&&&&&&&&& errorIndex = // forget original bad node.&&&&&&&&&&&&&&& }&&&&&&&&&&&&& }&&&&&&&&&&&&& //删除这个有问题的数据节点&&&&&&&&&&&&& newnodes =& new DatanodeInfo[nodes.length-1];&&&&&&&&&&&&& System.arraycopy(nodes, 0, newnodes, 0, errorIndex);&&&&&&&&&&&&& System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,& newnodes.length-errorIndex);&&&&&&&&&&&&& nodes =&&&&&&&&&&&&& recoveryErrorCount = 0; &&&&&&&&&&&&& errorIndex = -1;&&&&&&&&&&&&&&&&&&&&&& //因为还有可用的数据节点,暂时放回,稍后再试&&&&&&&&&&& }&&&&&&&&&&& String emsg = "Error Recovery for block " + block + " failed " + " because recovery from primary datanode " +& primaryNode + " failed " + recoveryErrorCount + & " times. "& + " Pipeline was " + pipelineMsg +& ". Aborting...";&&&&&&&&&&& LOG.warn(emsg);&&&&&&&&&&& lastException = new IOException(emsg);&&&&&&&&&&& closed =&&&&&&&&&&& if (streamer != null) streamer.close();&&&&&&&&&&&&&&&&& // 因为可用的数据节点,错误已经无法再处理了,将关闭HDFS的数据写入流&&&&&&&&& } &&&&&&&&&&&&&&&&&& // 因为还有可用的数据节点,暂时放回,稍后再试&&&&&&& } finally {&&&&&&&&& RPC.stopProxy(primary);&&&&&&& }&&&&&& &&&&&&&& recoveryErrorCount = 0; // block recovery successful&&&&&&& // If the block recovery generated a new generation stamp, use that from now on.& Also, setup new pipeline&&&&&&& if (newBlock != null) {&&&&&&&&& block = newBlock.getBlock();&&&&&&&&& nodes = newBlock.getLocations();&&&&&&& }&&&&&&& this.hasError =&&&&&&& lastException =&&&&&&& errorIndex = 0;&&&&&&& success = createBlockOutputStream(nodes, clientName, true);&&&&& }&&&&& response = new ResponseProcessor(nodes);&&&&& response.start();&&&& &&&&&& // do not sleep, continue processing&&& }&&& 哎,看到上面的代码,烦都烦死了,我还是简单的描述一下关于processDatanodeError函数处理I/O过程中的错误吧!当有错误发生时。肯定是某一个数据节点发生了问题,那么首先会把这个有问题的数据节点删除掉。然后从剩余的可用的数据节点中选取一个,让它来恢复当前的这个Block,如果成功了ok,而失败,则删除再删除这个出问题的节点,则继续选节点来恢复Block,直到成功,如果最后没有可用的数据节点来恢复Block,则宣告这个Block写入失败,将关闭DFSOutputSream流,当用户再次写入时抛出异常。上一篇本文我详细的分析了在HDFS的文件写操作中,客户端是如何工作的,其工作核心可总结为两点:一是向NameNode申请Block,而是向数据节点传输Block的packet。那么,数据节点是如何来接受这个数据块的呢?这个还得从数据节点的注册说起。
&&& 数据节点在启动之后,会向NameNode节点进行注册来告诉它自己的一些信息,这些信息包括自己的存储信息,服务信息等。其中,服务信息包括自己的数据接受地址、RPC地址、Info地址。数据接受地址就是用来接受客户端发送过来的有关数据块的操作。而这个工作统一交给DataXceiverServer来管理,这个DataXceiverServer实际上可以看做是一个线程池,具体的为某一个客户端服务的话,它还是交给工作线程DataXceiver来做的。好吧,那就来具体的看看DataXceiver。
class DataXceiver implements Runnable, FSConstants {&& &public void run() {&& & &&&& DataInputStream in= &&& try {&&&&& in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));&&&&& short version = in.readShort();&&&&& if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {&&&&&&& throw new IOException( "Version Mismatch" );&&&&& }&&&& &&&&& boolean local = s.getInetAddress().equals(s.getLocalAddress());&&&&& byte op = in.readByte();&&&& &&&&&& // Make sure the xciver count is not exceeded&&&&& int curXceiverCount = datanode.getXceiverCount();&&&&& if (curXceiverCount & dataXceiverServer.maxXceiverCount) {&&&&&&& throw new IOException("xceiverCount " + curXceiverCount& + " exceeds the limit of concurrent xcievers " + dataXceiverServer.maxXceiverCount);&&&&& }&&&& &&&&&& long startTime = DataNode.now();&&&&& switch ( op ) {&&&&& case DataTransferProtocol.OP_READ_BLOCK:&&&&&&& readBlock( in );&&&&&&& datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);&&&&&&& if (local)&&&&&&&&& datanode.myMetrics.readsFromLocalClient.inc();&&&&&&& else&&&&&&&&& datanode.myMetrics.readsFromRemoteClient.inc();&&&&&&&&&&&& case DataTransferProtocol.OP_WRITE_BLOCK:&&&&&&& writeBlock( in );&&&&&&& datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);&&&&&&& if (local)&&&&&&&&& datanode.myMetrics.writesFromLocalClient.inc();&&&&&&& else&&&&&&&&& datanode.myMetrics.writesFromRemoteClient.inc();&&&&&&&&&&&& case DataTransferProtocol.OP_READ_METADATA:&&&&&&& readMetadata( in );&&&&&&& datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);&&&&&&&&&&&& case DataTransferProtocol.OP_REPLACE_BLOCK: // f send to a destination&&&&&&& replaceBlock(in);&&&&&&& datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);&&&&&&&&&&&& case DataTransferProtocol.OP_COPY_BLOCK:&&&&&&&&&&& // f send to a proxy source&&&&&&& copyBlock(in);&&&&&&& datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);&&&&&&&&&&&& case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block&&&&&&& getBlockChecksum(in);&&&&&&& datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);&&&&&&&&&&&& default:&&&&&&& throw new IOException("Unknown opcode " + op + " in data stream");&&&&& }&&& } catch (Throwable t) {&&&&& LOG.error(datanode.dnRegistration + ":DataXceiver",t);&&& } finally {&&&&& LOG.debug(datanode.dnRegistration + ":Number of active connections is: " + datanode.getXceiverCount());&&&&& IOUtils.closeStream(in);&&&&& IOUtils.closeSocket(s);&&&&& dataXceiverServer.childSockets.remove(s);&&& }& }}&&& 在DataXceiver的run()方法中,他根据客户端的操作要求(如:读数据块、写数据块、复制数据块等)来调用响应的函数处理,本文既是谈论文件的写操作,那么自然就会具体分析writeBlock()方法。
private void writeBlock(DataInputStream in) throws IOException {&&& DatanodeInfo srcDataNode =&&& LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + " tcp no delay " + s.getTcpNoDelay());& & //先取出相关的头部信息&
&&& Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());//Block的id和创建时间&& &&& int pipelineSize = in.readInt(); //Block副本数量&&& boolean isRecovery = in.readBoolean(); // 是否是恢复Block&&& String client = Text.readString(in); //客户端名字&&& boolean hasSrcDataNode = in.readBoolean(); // 是否来自数据节点&&& if (hasSrcDataNode) {&&&&& srcDataNode = new DatanodeInfo();&&&&& srcDataNode.readFields(in);//源数据节点信息&&& }&&& int numTargets = in.readInt();//存放剩余Block副本数量&&& if (numTargets & 0) {&&&&& throw new IOException("Mislabelled incoming datastream.");&&& }&&& DatanodeInfo targets[] = new DatanodeInfo[numTargets];&&& for (int i = 0; i & targets. i++) {&&&&& DatanodeInfo tmp = new DatanodeInfo();&&&&& tmp.readFields(in);//存放剩余Block副本的数据节点信息&&&&& targets[i] =&&& }&&& DataOutputStream mirrorOut =& // 向下一个数据节点写入Block的网络I/O流&&& DataInputStream mirrorIn =&&& // reply from next target&& &&&& DataOutputStream replyOut =&& // stream to prev target&& &&&& Socket mirrorSock =&&&&&&&&&& // socket to next target&&& BlockReceiver blockReceiver = // responsible for data handling&&& String mirrorNode =&&&&&&&&&& // the name:port of next target&&& String firstBadLink = "";&&&&&&&&&& // first datanode that failed in connection setup&&& try {&&&&& &&&&& blockReceiver = new BlockReceiver(block, in, s.getRemoteSocketAddress().toString(), s.getLocalSocketAddress().toString(), isRecovery, client, srcDataNode, datanode);&&&&& if (targets.length & 0) {&&&&&&& InetSocketAddress mirrorTarget =&&&&&&& // Connect to backup machine&&&&&&& mirrorNode = targets[0].getName();&&&&&&& mirrorTarget = NetUtils.createSocketAddr(mirrorNode);&&&&&&& mirrorSock = datanode.newSocket();&&&&&&& try {&&&&&&&&& int timeoutValue = numTargets * datanode.socketT&&&&&&&&& int writeTimeout = datanode.socketWriteTimeout + (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);&&&&&&&&& NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);&&&&&&&&& mirrorSock.setSoTimeout(timeoutValue);&&&&&&&&& mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);&&&&&&&&& mirrorOut = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(mirrorSock, writeTimeout),SMALL_BUFFER_SIZE));&&&&&&&&& mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));&&&&&&&&& // 向下一个数据节点写入Block的头部信息&&&&&&&&& mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );&&&&&&&&& mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );&&&&&&&&& mirrorOut.writeLong( block.getBlockId() );&&&&&&&&& mirrorOut.writeLong( block.getGenerationStamp() );&&&&&&&&& mirrorOut.writeInt( pipelineSize );&&&&&&&&& mirrorOut.writeBoolean( isRecovery );&&&&&&&&& Text.writeString( mirrorOut, client );&&&&&&&&& mirrorOut.writeBoolean(hasSrcDataNode);&&&&&&&&& if (hasSrcDataNode) { // pass src node information&&&&&&&&&&& srcDataNode.write(mirrorOut);&&&&&&&&& }&&&&&&&&& mirrorOut.writeInt( targets.length - 1 );&&&&&&&&& for ( int i = 1; i & targets. i++ ) {&&&&&&&&&&& targets[i].write( mirrorOut );&&&&&&&&& }
&&&&&&&&& //向下一个数据节点写入校验信息&&&&&&&&& blockReceiver.writeChecksumHeader(mirrorOut);&&&&&&&&& mirrorOut.flush();&&&&&&&&& //获取下一个数据节点的确认信息&&&&&&&&& if (client.length() != 0) {&&&&&&&&&&& firstBadLink = Text.readString(mirrorIn);&&&&&&&&&&& if (LOG.isDebugEnabled() || firstBadLink.length() & 0) {&&&&&&&&&&&&& ("Datanode " + targets.length +&&&&&&&&&&&&&&&&&&&&&& " got response for connect ack " +&&&&&&&&&&&&&&&&&&&&&& " from downstream datanode with firstbadlink as " +&&&&&&&&&&&&&&&&&&&&&& firstBadLink);&&&&&&&&&&& }&&&&&&&&& }&&&&&&& } catch (IOException e) {&&&&&&&&& if (client.length() != 0) {&&&&&&&&&&& Text.writeString(replyOut, mirrorNode);&&&&&&&&&&& replyOut.flush();&&&&&&&&& }&&&&&&&&& IOUtils.closeStream(mirrorOut);&&&&&&&&& mirrorOut =&&&&&&&&& IOUtils.closeStream(mirrorIn);&&&&&&&&& mirrorIn =&&&&&&&&& IOUtils.closeSocket(mirrorSock);&&&&&&&&& mirrorSock =&&&&&&&&& if (client.length() & 0) {&&&&&&&&&&&&&&&&&&&& } else {&&&&&&&&& }&&&&&&& }&&&&& }
&&&&& if (client.length() != 0) {&&&&&&& if (LOG.isDebugEnabled() || firstBadLink.length() & 0) {&&&&&&& }&&&&&&& Text.writeString(replyOut, firstBadLink);//向前一个一个数据节点或客户回复&&&&&&& replyOut.flush();&&&&& }&&&&& // 接受前一个数据节点或客户端发送来的Block,并把该Block发送到下一个数据节点&&&&& String mirrorAddr = (mirrorSock == null) ? null : mirrorN&&&&& blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets.length);&&&&& if (client.length() == 0) {
&&&&&&& //将接收到的一个Block添加到当前数据节点的receivedBlockList
&&&&&&& datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);&&&& &&&&& if (datanode.blockScanner != null) {&&&&&&& datanode.blockScanner.addBlock(block);&&&&& }&&&& &&&& } catch (IOException ioe) {&&&&&&&& } finally {&&&&& // close all opened streams&&&&& IOUtils.closeStream(mirrorOut);&&&&& IOUtils.closeStream(mirrorIn);&&&&& IOUtils.closeStream(replyOut);&&&&& IOUtils.closeSocket(mirrorSock);&&&&& IOUtils.closeStream(blockReceiver);&&& }& }&&& 在writeBlock()方法中,数据节点主要是靠BlockReceiver来接受前一个客户端或者是数据接节点发送过来的Block,并把它发送到下一个数据节点。关于BlockReceiver的实现细节在这里我就不再赘述了,有兴趣的盆友可以自己去研究一下它的源代码。当一个DataXceiver成功地接受完一个Block并把它发送到下一个数据节点之后,它就会把刚收到的一个Block放到DataNode的receivedBlockList队列中,而DataNode也会将该Block报告给NameNode节点。
&&& 另外,我还用补充一个问题就是关于数据节点的数据传输服务地址的问题。我曾经遇到过这样一个有趣的问题:我只在一台pc上部署HDFS,一个NameNode节点,一个DataNode节点,其中我的pc的ip地址是*.*.*.*(教育网ip地址),然后我把NameNode的服务地址配置成localhost:8020,DataNode的数据服务地址配置成*.*.*.*:50010,结果客户端在向数据节点传输Block时始终都无法连接到DataNode的数据服务端口。盆友们知道这是为什么吗?这是因为数据节点在向NameNode节点注册时,NameNode执行了下面一段代码:
String dnAddress = Server.getRemoteAddress();&&& if (dnAddress == null) {&&&&& dnAddress = nodeReg.getHost();&&& }&&&& &&&& // check if the datanode is allowed to be connect to the namenode&&& if (!verifyNodeRegistration(nodeReg, dnAddress)) {&&&&& throw new DisallowedDatanodeException(nodeReg);&&& }&&& String hostName = nodeReg.getHost();&&&& &&&& // update the datanode's name with ip:port&&& DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),& nodeReg.getStorageID(),& nodeReg.getInfoPort(), nodeReg.getIpcPort());&&& nodeReg.updateRegInfo(dnReg);&&&& 在上面的代码中,NameNode节点一开始就根据连接的数据节点的远程地址更新了注册信息nodeReg,即修改了数据节点的数据服务地址,问题也就出在这儿了。由于我设置的NameNode的地址是localhost:8020,所以当数据节点向NameNode节点连接时,并没有走路由器,那么NameNode收到的数据节点连接的远程地址就是127.0.0.1了,即Server.getRemoteAddress()会返回<span style="FONT-SIZE: 18 COLOR: #7.0.0.1,客户端收到的数据节点的数据服务地址就是127.0.0.1:50010,同时数据节点也干了一件轰动的事情,就是把数据服务地址绑定到了*.*.*.*:50010上,而不是只绑定到某一个端口上,着一系列的动作就导致了客户端找不到数据节点的数据服务地址。之后,我就索性直接把NameNode的地址配置成*.*.*.*:8020,然后半点事没有。在前面的几篇博文中,我分别花了很大的篇幅来介绍HDFS在处理用户写一个文件时客户端、数据节点干了些什么事情,那么在本文,我将重点介绍一下NameNode节点在这个过程中到底为用户刚了些神马?
&&&&&& 用户调用HDFS的API来写入一个文件,教科书式的标准三过程:1.create;2.write;3.close,对应于客户端的这三个工程,NameNode又相应的做了哪些响应呢?先来看一张图吧!
&&&& 从上面的图中,我们可以清楚的看出NameNode对应于用户的三个动作分别以create、addBlock、complete来进行相关的处理。现在,我就来详细的分析NameNode的这三个动作是如何实现的。
&&&& NameNode的create动作主要是为客户端传过来的文件名在HDFS的Namesystem中申请一个名字空间,并为之建立一个响应的iNode,当然,这个iNode的状态是underConstruction,然后为这个客户创建一个该文件的租约,就是文件的独占锁,以防止其它的客户端对这个文件同时写。它的流程如下:
&&&&& NameNode的addBlock动作主要是为文件创建一个新的Block,并为这个Block的副本分配存储DataNode节点,最后给客户端返回一个LocatedBlock对象,该对象包含Block的副本应该存放的位置。在这里我想说得是,NameNode节点此时并不保存该Block的副本位置,而是等到成功接收该Block的数据节点自动报告时它才正式记录该Block的一个副本的位置,这样做是由于HDFS不能保证Block一开始分配的数据节点都能成功结束Block。它的工作流程如下:
&&&&& NameNode的complete动作就是更改与当前文件节点相关的状态,同时释放文件的租约。另外,NameNode还要判断文件的所有Blocks的副本是否已满足,对于还不满足的Blocks,NameNode将其放入neededReplications队列中,让其它的后台线程来负责这些Block的副本情况。它的工作流程如下:
&&&&&& 关于NameNode处理客户端的写文件操作的处理流程细节都在上面的流程图中,但是,这个过程单中还有少量的与之没有直接关系的操作并没有在上面的流程图中反映出来,如果有兴趣的话,可以仔细研读相关的源代码。
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
[商业源码]&
Copyright &
All Rights Reserved

我要回帖

更多关于 不用手机号找回微信密码 的文章

 

随机推荐