摘要 hadoop的两大核心是hdfs和mapreduce,hdfs是适合部署在廉价机器上的高度容错性系统,提供了强大的流式读写文件的能力,mapreduce则提供了大规模数据集的并行运算。

Hadoop中HDFS和MapReduce的进程

本地单机模式安装好hadoop服务后,可以使用start-all.sh启动hadfs和mapredue服务,也可以使用start-hdfs.shstart-yarn.sh分别启动。正常启动后,通过jps查看有5个进程:

  • NameNode 管理文件系统的命名空间。它维护着文件系统树及整棵树内所有的文件和目录。HDFS集群有两类节点,并以管理者-工作者模式运行,即一个NameNode(管理者)和多个DataNode(工作者)[就独立模式而言,仅有一个NameNode(管理者)和一个DataNode(工作者)]。一个HDFS cluster包含一个NameNode和若干的DataNode,NameNode是master,主要负责管理hdfs文件系统,具体地包括namespace管理(其实就是目录结构),block管理(其中包括 filename->block,block->ddatanode list的对应关系)。NameNode 依赖来自每个 DataNode 的定期心跳(heartbeat)消息。每条消息都包含一个块报告,NameNode 可以根据这个报告验证块映射和其他文件系统元数据。如果 DataNode 不能发送心跳消息,NameNode 将采取修复措施,重新复制在该节点上丢失的块。
  • SecondaryNameNode NameNode节点的副本。NameNode进行数据snapshots进行备份,这样尽量降低NameNode崩溃之后,导致数据的丢失,其实所作的工作就是从nn获得fsimage和edits把二者重新合并然后发给NameNode,这样,既能减轻NameNode的负担又能保险地备份。

  • DataNode Datanode是文件系统的工作节点,他们根据客户端或者是namenode的调度存储和检索数据,并且定期向namenode发送他们所存储的块(block)的列表。

  • NodeManager NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。YARN/MRv2 Node Manager深入剖析—整体架构

  • ResourceManager ResourceManager负责集群中所有资源的统一管理和分配,它接收来自各个节点(NodeManager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序(实际上是ApplicationManager)。YARN/MRv2 Resource Manager深入剖析—RM总体架构

注意:没有JobTracker和TaskTracker进程。

小试牛刀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

package com.diudiu.mapreduce;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.stream.Stream;

/**
* Description hadoop.
* <p/>
* Time 2017/4/17 下午11:34
*
* @author zhoujunwen
* @version 1.0.0.0
* @see com.diudiu.mapreduce
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), ' ');
Stream.of(words).forEach((String x) ->{
try {
context.write(new Text(x), new IntWritable(1));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

package com.diudiu.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Description hadoop.
* <p/>
* Time 2017/4/17 下午11:39
*
* @author zhoujunwen
* @version 1.0.0.0
* @see com.diudiu.mapreduce
*/
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum = sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.diudiu.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


/**
* Description hadoop.
* <p/>
* Time 2017/4/17 下午11:45
*
* @author zhoujunwen
* @version 1.0.0.0
* @see com.diudiu.mapreduce
*/
public class RunJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://master:30010");
configuration.set("yarn.resourcemanager.hostname", "master");

FileSystem fs = FileSystem.get(configuration);

Job job = Job.getInstance(configuration);
job.setJarByClass(RunJob.class);

job.setJobName("Word Count");
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path("/input/word"));

Path outpath = new Path("/output/wc");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);

boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("job任务执行成功");
}
}
}