本篇文章为大家展示了HDFS中reportWrittenBlock函数的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
公司主营业务:网站设计制作、成都网站制作、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联公司是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联公司推出青浦免费做网站回馈大家。
/**
* The client can report in a set written blocks that it wrote.
* These blocks are reported via the client instead of the datanode
* to prevent weird heartbeat race conditions.
*/
public void reportWrittenBlock(LocatedBlock lb) throws IOException {
Block b = lb.getBlock();//获取完成的这个Block信息
DatanodeInfo targets[] = lb.getLocations();//获取节点信息
for (int i = 0; i < targets.length; i++) {
namesystem.blockReceived(b, targets[i].getName());//对于每个DataNode来说,都要调用一次此函数
}
}
C1:2014-12-19 18:26:00 C2:2014-12-19 18:59:00 C3:2014-12-19 19:03:00
=========================
那么,接下来就是理解 namesystem.blockReceived(b, targets[i].getName());了。
/**
* The given node is reporting that it received a certain block.
*/
public synchronized void blockReceived(Block block, UTF8 name) {
DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);//获取对应的datanode
if (node == null) {//为空可不行
throw new IllegalArgumentException("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name);
}
//
// Modify the blocks->datanode map
//
addStoredBlock(block, node);//下面两行是来执行block和node的一个映射。
//
// Supplement node's blockreport
//
node.addBlock(block);//同上
}
C1:2014-12-19 19:11:00 C2:2014-12-19 19:11:00 C3:2014-12-19 19:12:00
===============那么接下来还有2个函数需要攻破,分别是addStoredBlock和node.addBlock(block);
后面一个函数非常简单,不细讲,所以就剩下最后一个函数了!
addStoredBlock(block, node);的执行过程如下:
synchronized void addStoredBlock(Block block, DatanodeInfo node) {
TreeSet containingNodes = (TreeSet) blocksMap.get(block);//获取当前block已经存在的datanode信息
if (containingNodes == null) {//这里保证肯定存在datanode集合,不保证一定有节点在内
containingNodes = new TreeSet();
blocksMap.put(block, containingNodes);
}
if (! containingNodes.contains(node)) {//根据需要决定是否加入此datanode信息
containingNodes.add(node);
} else {
LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);
}
//接下来的逻辑是确定是否需要重新备份
synchronized (neededReplications) {//锁定neededReplications
if (dir.isValidBlock(block)) {//不懂这一句
if (containingNodes.size() >= this.desiredReplication) {//如果已经超过最大备份个数
neededReplications.remove(block);//删除此block
pendingReplications.remove(block);//删除此block
} else if (containingNodes.size() < this.desiredReplication) {
if (! neededReplications.contains(block)) {
neededReplications.add(block);//否则表示需要重新备份,这代码写的真够差的。。。
}
}
//
// Find how many of the containing nodes are "extra", if any.
// If there are any extras, call chooseExcessReplicates() to
// mark them in the excessReplicateMap.
//
//也有可能一个block存储的datanode节点数太多了,同样要删除这些block
Vector nonExcess = new Vector();//构造一个空的Vector
for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();//对于当前节点来说
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());//取到当前节点的多余块信息
if (excessBlocks == null || ! excessBlocks.contains(block)) {//如果之前没有标志在这个节点的多余块信息里
nonExcess.add(cur);//则表明当前节点存储了这个block
}
}
if (nonExcess.size() > this.maxReplication) {//如果超过了最大备份数
chooseExcessReplicates(nonExcess, block, this.maxReplication);//选择若干来消除块
}
}
}
}
void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
while (nonExcess.size() - maxReps > 0) {//如果还有需要
int chosenNode = r.nextInt(nonExcess.size());//随机选择一个节点
DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
nonExcess.removeElementAt(chosenNode);//获取这个节点
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
if (excessBlocks == null) {
excessBlocks = new TreeSet();
excessReplicateMap.put(cur.getName(), excessBlocks);
}
excessBlocks.add(b);//加入此block到excessReplicateMap
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());
if (invalidateSet == null) {
invalidateSet = new Vector();
recentInvalidateSets.put(cur.getName(), invalidateSet);
}
invalidateSet.add(b);//同样的,更新recentInvalidateSets,没啥好解释的
}
}
上述内容就是HDFS中reportWrittenBlock函数的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。
名称栏目:HDFS中reportWrittenBlock函数的作用是什么
网页网址:http://scpingwu.com/article/pejieh.html