首页 > BigData, HDFS > HDFS datanode阅读手记

HDFS datanode阅读手记

1. Hign Level Module View

datanode modules

  • HttpServer: 提供http service, 展示内部状态信息
  • IPCServer: Rpc service framework, 模块间的交互都通过rpc来完成(Block数据传输除外)
  • DataXceiverServer: 数据传输server, 提供Block数据的读写修改等功能
  • BlockPoolManager: 提供管理BlockPool的相关API。其中,BlockOfferService为每个Namespace下每个BlockPool一个实例,提供BlockPool对它所对应的Namespace的操作的相关API, BlockServiceActor为指定Namespace中每个namenode一个实例,自已持有线程,定时向它所对应的namenode发heartbeat, blockreport, 并执行namenode通过heartbeat/blockreport response传回来的command
  • FsDataset: 封装了datanode所管理的所有的block相关数据,提供相关的API,其中FsVolume封装了对单块盘的相关的操作
  • DataBlockScanner: 持有一个独立的线程,轮询调用每个block pool的BlockPoolSliceScanner,扫描每个block并检查block的数据是否正常
  • DirectoryScanner: 持有独立的调度线程,定时发起对block pool目录的scan, 从磁盘获取block相关的report, 对比meta与实际的磁盘上的block的差异,然后更新block meta,使之与磁盘上的实际情况一致

2. IPC Server

注: IPC Server是hadoop common里的部分,看datanode的时候顺便了解了一下,就顺便在这里也介绍一下。
ipc server
2.1 IPC Server的网络服务器的基本模型,没有什么特别的,就是一个普通的socket server

  • 一个Listener: 不断调用select,查看是否有socket可读事件,如果有,轮询取一个Reader开始处理
  • 若干个Reader: 读取指定连接上的请求(安全认证相关的check都在这里),构造Call(Call是IPCServer中调用上下文的数据结构),加入到CallQueue
  • 若干个Handler: 不断从CallQueue中拿Call进行处理,这里是实际rpc接口在Server端调用的地方,这里同步调完rpc接口,把结果进行拼装成client的需要的格式,并加入到对应的Call的responseQueue中(这里如果Call对应的responseQueue Size为1,会尝试发送一次
  • 一个Responder: 不断调用select, 关注socket可写事件,处理可写的socket对应的Call的responseQueue,尝试发送response给client, 另外还要清理长时间在responseQueue中没发出去的response

2.2 IPC Server协议相关部分
rpc service
如上图所示,通常,hadoop中的IPC Server/Client的基本结构是这样的:

  • 有一个基础的XXProtocol的接口类,定义了XXServer要提供的service的interface
  • XXServer通过实现XXProtocol定义的interface, 来提供相应的service
  • XXClient通过XXProtocol的proxy,来使用XXService提供的Rpc service

在hadoop中的实现中,目前支持两种RPC机制,一种是基于老式的Writable, 一种是基于这几年比较流行的ProtocolBuffer. 目前hadoop内部的项目都是用的基于ProtocolBuffer的RPC机制,因此,上图中也多了几个与PB相关的类:

  • XXProtocolPB: 是PB协议的BlockingInterface跟应用之间的中间层,可以认为它就是PB生成的interface定义
  • XXProtocolTranslatorPB: 它负责把Client本地的rpc调用,转换成PB的方式,并发送到Server端: response service(request) => responsePB service(requestPB)
  • XXProtocolServerSideTranslatorPB: 它负责把Server端收到PB形式的rpc调用,转换成Server实现的本地形式: responsePB service(requestPB) => response service(request)

2.3 IPC Server对各种RpcEngine的支持
hadoop RPC 可以通过扩展RpcKind, 实现对应的RpcEngine,来扩展新的Rpc实现,比如基于Thrift的实现等。具体要选用哪个RpcEngine, 跟协议的RpcKind是紧密关联的。

3. DataXceiverServer

data xceiver
3.1 DataXceiverServer
DataXceiverServer是一个简单的tcp socket server, 监听特定的端口,持有一个单独的线程,不断调用accept,如果有请求过来,就创建一个DataXceiver线程并启动,进行处理
3.2 DataXceiver
每个DataXceiver都是一个单独的线程(缺省配置了4096个线程上限), DataXceiver处理指定socket上所有的任务:READ_BLOCK, WRITE_BLOCK, REPLACE_BLCOK, COPY_BLOCK, BLOCK_CHECKSUM, TRANSFER_BLOCK. DataXceiver的处理的主要逻辑有两个步骤:

  • readOp(): 通过收取一定量的数据,解包,解析请求类型
  • processOp(): 根据请求类型,发起相应的处理逻辑

4. BlockPoolManager

block pool manager
如上图所示为BlockPoolManager模块的类结构图,BlockPoolManager是应用逻辑提供BlockPool管理功能的类。这个模块里主要包括三个类:

  • BlockPoolManager: 对外提供管理BlockPool的API,其它模块对BlockPool的操作都通过它来完成,每个datanode中都有一个它的实例
  • BPOfferService: 封装了对单个Namespace下单个BlockPool相关的功能,由外部分线程驱动,每个Namespace下每个BlockPool都包含一个它的实例
  • BPServiceActor: 封装了单个BlockPool针对单台namenode的相关逻辑,包括connect()->handshake()->register()->heartbeat()->reportblock()一系列操作,自带线程驱动,每个BPServiceActor是一个单独的线程

5. DataBlockScanner

data block scanner
DataBlockScanner的主要功能是扫描并校验磁盘上的data block, 把发现的坏块报告给namenode. 它的主要处理数据结构及处理流程如上图所示
5.1 数据结构

  • BlockPoolSliceScanner是专门负责扫描一个BlockPool的scanner
  • DataBlockScanner是一个独立的线程,它持有BlockPool个数个BlockPoolSliceScanner(每个BlockPool对应一个BlockPoolSliceScanner)

5.2 处理流程

  • DataBlockScanner线程启动
  • DataBlockScanner依次轮询每个BlockPoolSliceScanner, 调用它的scanBlockPoolSlice()启动对指定BlockPool的scan
  • 指定的BlockPoolSliceScanner启动scan后,开始进行校验当前BlockPool的每个block, 最后把校验失败的block报告给namenode

6. DirectoryScanner

directory scanner
如上图所示为DirectoryScanner的类图,图中只列出了DirectoryScanner的主要的数据成员和成员函数。DirectoryScanner的主要任务是定期扫描磁盘上的block, 检查磁盘上的block信息是否与meta文件中的一致,把不一致之处进行调整,使之一致。
6.1 主要数据成员

  • reportCompileThreadPool: 异步收集磁盘上block信息的线程池
  • masterThread: 调度线程,定期调用DirectoryScanner的run(),启动整个scan过程
  • diffs: 描述磁盘block信息与metadata之间差异的数据结构,在扫描的过程中更新,扫描结束后把diffs更新到数据上
    6.2 主要处理流程
  • masterThread定时调用run()
  • run()调用reconcile(),发起整个整理的流程
  • reconcile()调用scan(),进行磁盘扫描
  • scan()启动reportCompileThreadPool, 收集磁盘上了block信息,并计算与meta的差异
  • reconcile()拿到scan()完成后的diffs, 调用FsDataset的checkAndUpdate(),作用到dataset上

7. FsDataset

fsdataset favolume
FsData是datanode上所有数据的抽象,FsVolume是对单块盘上的数据的抽象,上图是这些数据结构的类关系图。FsDatasetSpi和FsVolumeSpi是对外提供API的interface类,FsDatasetImpl和FsVolumeImpl是各自具体的实现。

分类: BigData, HDFS 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.
您必须在 登录 后才能发布评论.