Apache Hadoop 是一款支持数据密集型分布式应用程序并以 Apache 2.0 许可协议发布的开源软件框架。它支持在商品硬件构建的大型集群上运行的应用程序。Hadoop 是根据谷歌公司发表的 MapReduce 和 Google 文件系统的论文自行实现而成。所有的 Hadoop 模块都有一个基本假设,即硬件故障是常见情况,应该由框架自动处理。
子项目
- Hadoop Common:在 0.20 及以前的版本中,包含 HDFS、MapReduce 和其他项目公共内容,从 0.21 开始 HDFS 和 MapReduce 被分离为独立的子项目,其余内容为 Hadoop Common
- HDFS:Hadoop Distributed File System
- MapReduce:并行计算框架,0.20 前使用
org.apache.hadoop.mapred
旧接口,0.20 版本开始引入org.apache.hadoop.mapreduce
的新 API
HDFS 结构
- HDFS 是一个高度容错性的系统,适合部署在廉价的机器上。
- 硬件错误是常态而不是异常,错误检测和快速、自动的恢复是HDFS最核心的架构目标。
- 运行在 HDFS 上的应用需要 流式访问 数据集,HDFS 的设计更多考虑了数据批处理,而不是用户交互处理。比之数据访问的低延迟问题,更关键的在于 数据访问的高吞吐量。
- 移动计算比移动数据更划算:一个应用请求的计算,离它操作的数据越近就越高效,HDFS 为应用提供了将它们自己移动到数据附近的接口。
- HDFS 采用 master/slave 结构,一个 HDFS 集群由一个 Namenode 和一定数目的 Datanodes 组成。Namenode 是一个 中心服务器,负责管理文件系统的命名空间以及客户端对文件的访问;集群中的 Datanode 一般是一个节点一个,负责管理它所在节点上的存储。HDFS 暴露了文件系统的命名空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组 Datanode 上。Namenode 执行文件系统的命名空间操作,如打开、关闭、重命名文件或目录,它也负责确定数据块到具体 Datanode 节点的映射。Datanode 负责处理文件系统客户端的读写请求,在 Namenode 的统一调度下进行数据块的创建、删除和复制。Namenode 是所有 HDFS 元数据的仲裁者和管理者,用户数据永远不会流过 Namenode。
数据复制和副本
- HDFS 将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。一个典型的数据块大小是 64MB,HDFS 中的文件总是按照 64MB 被切分成不同的块,每个块尽可能地存储于不同的 Datanode 中。为了容错,文件的所有数据块都会有副本,每个文件的数据块大小和副本系数都是可配置的,应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。
- Namenode 全权管理数据块的复制,它周期性地从集群中的每个 Datanode 接收心跳信号和块状态报告。接收到心跳信号意味着该 Datanode 节点工作正常,块状态报告包含了一个该 Datanode 上所有数据块的列表。
- 大型 HDFS 实例一般运行在跨越多个机架的计算机组成的集群上,不同机架上的两台机器之间的通讯需要经过交换机。在大多数情况下,同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大。通过 机架感知(使用 API resolve 将 slave 的 DNS 名称(或IP地址)转换成机架id)Namenode可以确定每个 Datanode 所属的机架 id。一种简单但没有优化的策略是将副本存放在不同的机架上,这样可以有效防止当整个机架失效时数据的丢失,且允许读数据的时候充分利用多个机架的带宽。但这种策略的一个写操作需要传输数据块到多个机架,增加了写的代价。
大多数情况下,副本系数是3,HDFS 的存放策略是将 一个副本存放在本地机架的节点上,一个副本放在同一机架的另一个节点上,最后一个副本放在不同机架的节点上。 - HDFS 会尽量让读取程序读取离它最近的副本。如果在读取程序的同一个机架上有一个副本,那么就读取该副本。如果一个 HDFS 集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。
- Namenode 启动后会进入一个称为 安全模式 的特殊状态。处于安全模式的 Namenode 不会进行数据块的复制,并从所有的 Datanode 接收心跳信号和块状态报告。块状态报告包括了某个 Datanode 所有的数据块列表,每个数据块都有一个指定的最小副本数。当 Namenode 确认某个数据块的副本数目达到这个最小值,那么该数据块就会被认为是副本安全的;在一定百分比(这个参数可配置)的数据块被 Namenode 检测确认是安全之后(加上一个额外的30秒等待时间), Namenode 将退出安全模式状态。接下来它会确定还有哪些数据块的副本没有达到指定数目,并将这些数据块复制到其他 Datanode 上。
元数据持久化
- Namenode 上保存着 HDFS 的命名空间,对于任何对文件系统元数据产生修改的操作,Namenode 都会使用
EditLog
事务日志记录下来。Namenode 在本地操作系统的文件系统中存储这个Editlog
。整个文件系统的命名空间,包括数据块到文件的映射、文件的属性等,都存储在一个称为FsImage
的文件中,这个文件也放在 Namenode 所在的本地文件系统上。 - Namenode 在内存中保存着整个文件系统的命名空间和文件数据块映射的映像。这个关键的元数据结构设计得很紧凑,因此 4G 内存的 Namenode 足够支撑大量的文件和目录。当 Namenode 启动时,它从硬盘中读取
Editlog
和FsImage
,将所有Editlog
中的事务作用在内存中的FsImage
上,并将这个新版本的FsImage
从内存中保存到本地磁盘上,然后删除旧的Editlog
,这个过程称为一个检查点。 - Datanode 将 HDFS 数据以文件的形式存储在本地的文件系统中,它并不知道有关 HDFS 文件的信息。它把每个HDFS数据块存储在本地文件系统的一个单独的文件中。Datanode 并不在同一个目录创建所有的文件,而是用试探的方法来确定每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录中创建所有的本地文件并不是最优的选择,这是因为本地文件系统可能无法高效地在单个目录中支持大量的文件。当 Datanode 启动时,它会扫描本地文件系统,产生一个这些本地文件对应的所有 HDFS 数据块的列表,然后作为报告发送到 Namenode ,这个报告就是块状态报告。
集群通讯和健壮性
- 客户端通过一个可配置的 TCP 端口连接到 Namenode,通过 ClientProtocol 协议与 Namenode 交互。而 Datanode 使用 DatanodeProtocol 协议与 Namenode 交互。一个远程过程调用模型被抽象出来封装 ClientProtocol 和 Datanodeprotocol 协议。Namenode 不会主动发起 RPC,而是响应来自客户端或 Datanode 的 RPC 请求。
- 三种出错情况是:Namenode 出错、Datanode 出错、网络割裂。Datanode 出错和网络割裂可能导致一部分 Datanode 跟 Namenode 失去联系。Namenode 通过心跳信号的缺失来将近期不再发送心跳信号 Datanode 标记为宕机,不会再将新的 IO 请求发给它们,且任何存储在宕机 Datanode 上的数据将不再有效。这可能会引起一些数据块的副本系数低于指定值,Namenode 不断地检测这些需要复制的数据块,一旦发现就启动复制操作。
- HDFS 支持数据均衡策略。如果某个 Datanode 节点上剩余空闲空间低于临界点,系统会自动地将数据从这个 Datanode 移动到其他空闲 Datanode。当对某个文件的请求突然增加,也可能启动一个计划创建该文件新的副本,并且同时重新平衡集群中的其他数据。
- 当 HDFS 客户端创建一个新的 HDFS 文件时,会计算这个文件每个数据块的校验和,并将校验和作为一个单独的隐藏文件保存在同一个 HDFS 名字空间下。当客户端获取文件内容后,它会检验从 Datanode 获取的数据跟相应的校验和文件中的校验和是否匹配,如果不匹配,客户端可以选择从其他 Datanode 获取该数据块的副本。
FsImage
和Editlog
是 HDFS 的核心数据结构。如果这些文件损坏了,整个 HDFS 实例都将失效,因此 Namenode 可以配置成支持维护多个FsImage
和Editlog
的副本。Namenode 是 HDFS 集群中的 单点故障 所在。如果 Namenode 机器故障,需要手工干预。也可以通过指定配置,在 Namenode 宕机时切换到 Secondary Namenode。
数据组织
- 客户端缓存:客户端创建文件的请求并没有立即发送给 Namenode。HDFS 客户端会先将文件数据缓存到本地的一个临时文件,应用程序的写操作被透明地重定向到这个临时文件。当这个临时文件累积的数据量超过一个数据块的大小,客户端才会联系 Namenode。Namenode 将文件名插入文件系统的层次结构中,并且分配一个数据块给它,然后返回 Datanode 的标识符和目标数据块给客户端。接着客户端将这块数据从本地临时文件上传到指定的 Datanode 上。当文件关闭时,在临时文件中剩余的没有上传的数据也会传输到指定的 Datanode 上,然后客户端告诉 Namenode 文件已关闭,此时 Namenode 才将文件创建操作提交到日志里进行存储。如果 Namenode 在文件关闭前宕机,该文件将丢失。
- 流水线复制:假设文件的副本系数设置为3,且客户端开始向第一个 Datanode 传输数据,第一个 Datanode 逐小块(4 KB)地接收数据,将每一部分写入本地仓库,并同时传输该部分到列表中第二个 Datanode 节点。第二个 Datanode 也以此类推同时传给第三个 Datanode。即,Datanode 流水线式地从前一个节点接收数据,并在同时转发给下一个节点。
HDFS 使用
存储空间回收
- 当用户或应用程序删除某个文件时,HDFS 会将这个文件重命名转移到
.Trash
目录。文件在.Trash
中保存的时间是可配置的(设置属性fs.trash.interval
),超时后 Namenode 会将该文件从命名空间中删除,删除文件会使得该文件相关的数据块被释放,因此从用户删除文件到 HDFS 空闲空间的增加之间会有一定时间的延迟。 - 当一个文件的副本系数被减小后,Namenode 会选择过剩的副本删除,并在下次心跳检测时将该信息传递给 Datanode。
Secondary NameNode
- 因为 NameNode 只有在启动阶段才合并
fsImage
和EditLog
,所以日志文件可能会变得非常庞大,且下一次 NameNode 启动会花很长时间。 - Secondary NameNode 定期合并
fsImage
和EditLog
,将日志文件大小控制在一个限度下。因为内存需求和 NameNode 在一个数量级上,所以通常 Secondary NameNode 和 NameNode 运行在不同的机器上。Secondary NameNode 通过bin/start-dfs.sh
在`conf/masters 中指定的节点上启动。 - Secondary NameNode 的检查点进程启动,由两个配置参数控制:
fs.checkpoint.period
:指定连续两次检查点的最大时间间隔,默认为 1 小时;fs.checkpoint.size
:定义日志文件的最大值,一旦超过这个值会强制执行检查点,默认值是64MB。
- Secondary NameNode 保存最新检查点的目录与 NameNode 的目录结构相同,NameNode 可以在需要的时候读取 Secondary NameNode 上的检查点镜像。如果 NameNode 上除了最新的检查点以外,所有的其他的历史镜像和日志文件都丢失了,则可以引入这个最新的检查点:
- 在配置参数
dfs.name.dir
指定的位置创建空目录; - 把检查点目录的位置赋值给配置参数
fs.checkpoint.dir
; - 启动 NameNode,并加上
-importCheckpoint
。
- 在配置参数
- 按上述步骤,NameNode 会从
fs.checkpoint.dir
目录读取检查点,并把它保存在dfs.name.dir
目录下。如果dfs.name.dir
目录下有合法的镜像文件,NameNode 会启动失败。NameNode 会检查fs.checkpoint.dir
目录下镜像文件的一致性,但是不会改动它。
Map/Reduce
- 一个 Map/Reduce 作业通常会把输入的数据集切分为若干独立的数据块,由 map 任务以完全并行的方式处理它们。框架会对 map 的输出先进行排序,然后把结果输入给 reduce 任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
- 通常,Map/Reduce 框架和分布式文件系统是运行在一组相同的节点上的,即计算节点和存储节点通常在一起。Map/Reduce 框架由一个单独的 master JobTracker 和每个集群节点一个 slave TaskTracker 组成。master 负责调度构成一个作业的所有任务,这些任务分布在不同的 slave 上,master 监控它们的执行,重新执行已经失败的任务,slave 仅负责执行由 master 指派的任务。
- 应用程序通过提供 map 和 reduce 来实现 Mapper 和 Reducer 接口,它们组成作业的核心。Mapper 将输入键值对映射到一组中间格式的键值对集合,这种转换的中间格式记录集不需要与输入记录集的类型一致,一个给定的输入键值对可以映射成 0 个或多个输出键值对;Reducer 将与一个 key 关联的一组中间数值集归约为一个更小的数值集。Map 的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块数。
- Reducer有3个主要阶段:shuffle、sort 和 reduce。
- Shuffle:Reducer 的输入就是 Mapper 已经排好序的输出。在这个阶段,框架通过 HTTP 为每个 Reducer 获得所有 Mapper 输出中与之相关的分块。
- Sort:框架按照 key 的值对 Reducer 的输入进行分组(不同 mapper 的输出中可能会有相同的key)。Shuffle 和 Sort 两个阶段是同时进行的,map 的输出也是一边被取回一边被合并的。
- Reduce:框架为已分组的输入数据中的每个
<key, (list of values)>
对调用一次reduce(WritableComparable, Iterator, OutputCollector, Reporter)
方法。Reduce任务的输出通常是通过调用OutputCollector.collect(WritableComparable, Writable)
写入文件系统的。Reducer的输出是没有排序的。Reduce的数目建议是 0.95 或 1.75 乘以(<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)
。增加 Reduce 的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
参考资料:Hadoop 0.18 中文文档
原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(http://blog.forec.cn/2017/08/22/hadoop_knowledge/) 、作者信息(Forec)和本声明。