Contents
  1. 1. 概要模式
    1. 1.1. 数值概要
    2. 1.2. 倒排索引概要
    3. 1.3. 计数器计数
  2. 2. 过滤模式
    1. 2.1. 过滤
    2. 2.2. 布隆过滤
    3. 2.3. Top 10
    4. 2.4. 去重
  3. 3. 数据组织模式
    1. 3.1. 分层结构
    2. 3.2. 分区
    3. 3.3. 分箱
    4. 3.4. 全排序
    5. 3.5. 混排

概要模式

数值概要

数值概要模式是计算数值聚合统计值的一般性模式。这种模式一般是按照键来对进行分组,并对每个分组计算一些统计值。

常见应用:

  • 单词计数
  • 记录计数
    例如,对特定时间间隔(每周或者每天或者每小时等)计算数据每个时间周期内的数据数量。
  • 最大值或最小值计数
  • 平均值、中位数、标准差

倒排索引概要

倒排索引模式一般是建立一个项到标识符列表的映射。顾名思义就是建立一个索引列表。例如,搜索引擎一般需要建立的关键字到网址的列表,从而用户可以通过关键字找到与关键字相关的网页。

计数器计数

这个模式使用 MapReduce框架自身的计数器在不产生任何输出的情况下,在map阶段计算出一个全局计数。因此,我们可以用计数器来重新实现一下“单词计数”的算法,而且用计数器来实现计数效率要高很多。需要注意的是,由于计数器是存在全局内存中的,map工作完成后所有的TaskTracker将统计结果汇总给JobTracker(一般就是namenode),所以计数器的数量过多会造成namenode内存瓶颈或者溢出,因而,计数器的数量最多不能超过100个,一般应该控制在50个以下为好。

过滤模式

过滤

顾名思义就是对输入的所有行进行过滤,输出留下满足条件的行,忽略不满足条件的行。这种模式只有map没有reduce,在map中对每一行进行是否满足条件的判断,根据判断结果决定是输出(留下)该行数据还是忽略(扔掉)该行数据。

布隆过滤

布隆过滤与上面的过滤很相似,不同点在于布隆过滤中预先设定一个列表,列表中有一些值,只要输入的当前行包含其中的任意一个值就认为该行满足条件可以输出。布隆过滤仍然只有map没有reduce。

Top 10

Top10顾名思义就是从所有的输入行中过滤出某字段的前10名输入行。这种模式map阶段维护一个长度为10的列表,列表的长度小于10的时候向列表中直接添加,直到列表的长度等于10时,则删除列表中最小的元素并添加新元素。需要注意的是这种模式可以有多个map但只能有一个reduce,reduce从N个map输出的10N个元素中找出前10作为最终的输出。可以看出,由于只能有一个reduce所以如果取Top100甚至更多时,效率是很低的。因此这种模式只适合较小的取值。

去重

去重既将所有输入中某个字段重复的输入过滤掉,例如,在所有用户评论中找出所有不同的用户名。这种模式中map的输出键是map的输入值,map的输出值是null。reduece只需输出reduece的输入key即可。

数据组织模式

分层结构

该模式可将基于行的数据转换为分层的格式。
例如,给定一个帖子列表文件和评论列表文件,创建一个结构化的XML层次结构,嵌套的表示帖子及其相关评论。输出的形式如下:

Posts
 Post
  第一个帖子
  Comment
   第一个帖子的第一条评论
  /Comment
  Comment
   第一个帖子的第二条评论
  /Comment
 /Post
 Post
  第二个帖子
  Comment
   第二个帖子的第一条评论
  /Comment
  Comment
   第二个帖子的第二条评论
  Comment
 /Post
Posts

输入为两个文件,一个文件中每一行是一个帖子内容,其中有一个字段保存帖子id;另一个文件中每一行是一个评论,其中有一个字段保存与该评论对应的帖子id。

具体实现思路为,map以两个文件为输入,实现两个mapper类分别用于处理帖子文件和评论文件。PostMapper用于处理帖子文件,输出的key为帖子id,输出value为字母“P”+输入的value(每一行的内容)。CommentMapper用于处理评论文件,输出key为该评论对应的帖子id,输出value为字母C+输入的value。其中,字母P和C是为了在reduce中key值相同的一组值中哪些是帖子哪些是评论。由于用到两个mapper,所以main函数中需要用MultipleInput.addInputPath()为每一个mapper指定相应的文件。reduce可以很容易的区分出帖子和与该帖子对应的评论,将他们利用函数构建成xml格式再输出即可。

分区

分区模式是将记录进行分类,但并不关心记录的顺序。需要注意的是必须要预先知道要分多少个分区。用一个实例来描述一下分区的用法。

  • 问题一:给定一组用户信息,按照最近访问日期中的年份信息对记录进行分区,一年对应一个分区。

main函数需要配置成使用自定义分区器,分区器也需要配置,另外需要配置reduce的数目,每一个分区对应一个reduce。

1
2
3
4
5
6
7
//配置自定义分区器
job.setPartitionerClass(LastAccessDatePartitioner.class);
//为分区设置最小的年份,例如文件中的用户是从2008到2011,那么就是分成4个分区,每个分区的标号就是当前年份减去最小年份
//2008对应的是标号为0的分区,2009年对应的是标号为1的分区
LastAccessDatePartitioner.setMinLastAccessDate(job,2008);
//4个分区应该设置4个reduce
job.setNumReduceTasks(4);

map函数的输出key是年份,输出value就是输入value。然后需要实现LastAccessDatePartitioner类,完成分区器的定义,reduce直接输出即可。

由于第一个例子不是特别清楚,我又在网上找个一个例子,现在对分区的理解应该是正确的了。直接上例子。

  • 问题二:按年龄段找出不同年龄段的男性和女性的最高收入。

问题的输入如下

  • Id Name Age Gender Salary
    1201 gopal 45 Male 50,000
    1202 manisha 40 Female 50,000
    1203 khalil 34 Male 30,000
    1204 prasanth 30 Male 30,000
    1205 kiran 20 Male 40,000
    1206 laxmi 25 Female 35,000
    1207 bhavya 20 Female 15,000
    1208 reshma 19 Female 15,000
    1209 kranthi 22 Male 22,000
    1210 Satish 24 Male 25,000
    1211 Krishna 25 Male 25,000
    1212 Arshad 28 Male 20,000
    1213 lavanya 18 Female 8,000

其中第一行的内容是对数据每一列含义的解释,在真实的输入文件中是没有这一行的。

问题期望的输出如下:

  • 输出文件一 Part-00000
    Female 15000
    Male 40000

  • 输出文件二 Part-00001
    Female 35000
    Male 31000

  • 输出文件三 Part-00002
    Female 51000
    Male 50000

输出的每一个文件对应一个年龄段。

下面我来阐述实现的思路。首先需要明确几点:

  • 分区器是在map结束以后,reduce开始之前工作的。
  • 分区器分几个分区,就应该有几个reduce,所以设置reduce数量的时候要注意。
  • 每个reduce产生一个输出文件,也就是分在一个区里的数据对应的输出在同一个输出文件中。

map阶段输出key为性别字段,输出value为输入value;接着在partition阶段将年龄小于20、年龄大于20小于30以及年龄大于30的人分别分给三个reduce;因此,第一个reduce下面的应该是年龄小于20的两组数据,一组的key是Female,一组的key是Male,以此类推其他的两个reduce下面的数据就清楚了。reduce的工作很简单就是找出每组的最大值并输出即可。附上源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}

分箱

分箱模式是在map阶段对数据进行拆分,即将数据分为几类。这种模式只有map阶段。假设将数据分为n类,即有n个箱子,然后有m个mapper,由于每个mapper都有可能能会处理到这n类数据,所以每个mapper都会输出n个文件,那么一共就会输出m×n个文件。这也是这种模式的缺点所在。下面举一个实际例子说明一下这种模式的应用场景。

  • 问题:给定一组论坛的帖子,根据帖子的标签,将带有hadoop、pig、hive和hbase标签的帖子分别放到4个箱子中,另外,将帖子内容中提及hadoop的帖子放到一个额外的箱子中。

由于不同的箱子要输出到不同的文件中,所以需要用到MultipleOutputs类。main函数中需要的操作如下:

1
2
3
4
//配置输出文件放在名为“bin”的目录下
MultipleOutputs.addNameOutput(job,"bin",TextOutputFormat.class,Text.class,NullWritable.class);
//不需要reduce,所以将reduce数量设为0
job.setNumReduceTasks(0);

具体的实现思路其实很简单,map主要做的就是提取出每个输入帖子的标签,然后用多个if else语句来判断是否含有我们的目标标签(上面的4个标签),若含有则将这个帖子写到相应的箱子中。

全排序

(这个模式没有理解好)

混排

这个模式简单来说就是将有序的数据进行打乱。打乱的方式很简单map的输出键为一个随机的数,输出值为输入值。reduce仅输出value即可。

未完待续

Contents
  1. 1. 概要模式
    1. 1.1. 数值概要
    2. 1.2. 倒排索引概要
    3. 1.3. 计数器计数
  2. 2. 过滤模式
    1. 2.1. 过滤
    2. 2.2. 布隆过滤
    3. 2.3. Top 10
    4. 2.4. 去重
  3. 3. 数据组织模式
    1. 3.1. 分层结构
    2. 3.2. 分区
    3. 3.3. 分箱
    4. 3.4. 全排序
    5. 3.5. 混排