Contents
  1. 1. 算法概述
    1. 1.1. 算法输入格式
    2. 1.2. 算法输出格式
  2. 2. 算法流程
    1. 2.1. Step1
    2. 2.2. Step2
    3. 2.3. Step3
    4. 2.4. Step4
    5. 2.5. Step5

算法概述

算法的目标是,根据用户已经看过的电影来为用户合理推荐没有看过的电影。详细问题描述在这里 用hadoop构建电影推荐系统

算法输入格式

1
2
3
4
5
6
7
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0

第一行中,“1”代表用户id;“101”代表电影id;“5.0”表示该用户看过这部电影,并评分为5.0。

算法输出格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1 101,44.0
1 102,31.5
1 103,39.0
1 104,33.5
1 105,15.5
1 106,18.0
1 107,5.0
2 101,45.5
2 102,32.5
2 103,41.5
2 104,36.0
2 105,15.5
2 106,20.5
2 107,4.0

其中,以第一行为例,“1”代表用户id;“101”代表电影id;“44.0”表示该电影的综合推荐值,这个值越大表示这个电影越值得推荐。这里我们对数据集中的所有电影都进行综合评估,没有排除掉那些用户已经看过的电影,后续可能会继续完善一下。

算法流程

我们分五个步骤来完成这件事,每一个步骤都是一个mapreduce任务,在主函数中依次调用这五个任务。首先,给出主函数代码。

1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args) throws Exception
{
Step1.run();
Step2.run();
Step3.run();
Step4.run();
Step5.run();
System.exit(0);
}
}

Step1

步骤一将处理成如下结构

1
2
3
4
5
1 101:5.0,102:3.0,103:2.5
2 101:2.0,102:2.5,103:5.0,104:2.0
3 107:5.0,105:4.5,104:4.0,101:2.0
4 106:4.0,103:3.0,101:5.0,104:4.5
5 104:4.0,105:3.5,106:4.0,101:4.0,102:3.0,103:2.0

以第一行为例,意义为:用户1看了101,102,103这三部电影,分别评分为5.0,3.0,2.5。至于如何得到这个结果,我先贴出map和reduce两个函数的源码。

1
2
3
4
5
6
7
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line=value.toString();
String[] tmpArr=line.split(",");
context.write(new IntWritable(Integer.parseInt(tmpArr[0])), new Text(tmpArr[1]+":"+tmpArr[2]));
}

map函数非常简单,将每一行用“,”切分成字符串数组后,以用户id为key,电影id和评分连接起来的新字符串作为值。

1
2
3
4
5
6
7
8
9
10
11
public void reduce(IntWritable key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
String result="";
for(Text val:values)
{
result+=val+",";
}
result=result.substring(0, result.length()-1);
context.write(key, new Text(result));
}

reduce函数以map的输出为输入,同样key值的map输出为同一个reduce group,所以reduce就是将key值相同的所有value拼接起来,也就有了上面的结果。

Step2

步骤二以Step1的输出为输入,得到如下结果:

1
2
3
4
5
6
7
8
9
10
11
101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 3
102:103 3
102:104 2

以第2行为例,意义为:有三个用户既看了101电影又看了102电影。具体实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line=value.toString();
String[] tmpArr1=line.split(" ");
String[] tmpArr=tmpArr1[1].split(",");
for(int i=0;i<tmpArr.length;i++)
{
String Item1ID=tmpArr[i].split(":")[0];
for(int j=0;j<tmpArr.length;j++)
{
String Item2ID=tmpArr[j].split(":")[0];
context.write(new Text(Item1ID+":"+Item2ID), new IntWritable(1));
}
}
}

map函数针对每一行输入,进行切分,切分后用双重for循环对电影ID进行两两组合,并将组合结果作为key输出,value为1。

1
2
3
4
5
6
7
8
9
10
public void reduce(Text key,Iterable<IntWritable> values,Context context)
throws IOException,InterruptedException
{
int result=0;
for(IntWritable val:values)
{
result+=val.get();
}
context.write(key, new IntWritable(result));
}

reduce函数很简单,其实就是一个按组计数的过程。

Step3

步骤三以Step1的输出为输入,生成如下文件

1
2
3
4
5
6
7
101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
102 2:2.5,5:3.0,1:3.0
103 1:2.5,4:3.0,5:2.0,2:5.0
104 3:4.0,4:4.5,5:4.0,2:2.0
105 5:3.5,3:4.5
106 4:4.0,5:4.0
107 3:5.0

应该。。。不用。。。解释。。。能看出。。。是什么意思的吧。恩,就默认大家都能看懂了吧。直接上代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line=value.toString();
String[] tmpArr=line.split(" ");
String UserID=tmpArr[0];
String[] Item2pres=tmpArr[1].split(",");
for(int i=0;i<Item2pres.length;i++)
{
String[] tmp=Item2pres[i].split(":");
context.write(new Text(tmp[0]), new Text(UserID+":"+tmp[1]));
}
}

map以电影id为key,用户id:评分为值。

1
2
3
4
5
6
7
8
9
10
11
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
String result="";
for(Text val:values)
{
result+=val+",";
}
result=result.substring(0, result.length()-1);
context.write(key, new Text(result));
}

reduce就是将同一个key的value连接为一个字符串。

Step4

这个步骤的任务是把Step3和Step2的输出合并为一个文件,但是重点在于是将两个文件的相应行进行合并。合并结果应该是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
101:103 4 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:105 2 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:101 5 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:106 2 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:107 1 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:104 4 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
101:102 3 101 1:5.0,5:4.0,4:5.0,3:2.0,2:2.0
102:101 3 102 2:2.5,5:3.0,1:3.0
102:102 3 102 2:2.5,5:3.0,1:3.0
102:103 3 102 2:2.5,5:3.0,1:3.0
102:104 2 102 2:2.5,5:3.0,1:3.0
102:105 1 102 2:2.5,5:3.0,1:3.0
102:106 1 102 2:2.5,5:3.0,1:3.0
103:101 4 103 1:2.5,4:3.0,5:2.0,2:5.0
103:102 3 103 1:2.5,4:3.0,5:2.0,2:5.0
103:103 4 103 1:2.5,4:3.0,5:2.0,2:5.0
103:104 3 103 1:2.5,4:3.0,5:2.0,2:5.0
103:106 2 103 1:2.5,4:3.0,5:2.0,2:5.0
103:105 1 103 1:2.5,4:3.0,5:2.0,2:5.0

以第一行为例,意义就是:所有用户中有4个用户的既看了101电影又看了103电影,所有看了101电影的用户对它的评分如下。之所以处理成这样,是为了最后一步的矩阵计算做准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line=value.toString();
String[] tmpArr=line.split(" ");
String[] tmpArr2=tmpArr[0].split(":");
if(tmpArr2.length==2)
{
context.write(new Text(tmpArr2[0]), value);
}
else
{
dict.put(tmpArr[0], line);
}
}

这个步骤的特殊之处在于它的输入是两种格式不同的文件,对它们要进行不同的处理,并且要将结果统一成key和value的形式再传给reduce。map函数中用if和else来分别处理两个文件,if处理的是Step2的输出,输出的key是文件的第一个字段,value是输入的这一行文件。else处理的是Step3的输出,将这个文件每一行按键值对的方式存放再全局字典中(Java的HashMap)。

1
2
3
4
5
6
7
8
9
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
for(Text val:values)
{
String ItemID=new String(key.getBytes());
context.write(val, new Text(dict.get(ItemID)));
}
}

reduce将key值相同的map输出和dit中对应的值连接起来。

Step5

这个步骤应该是最复杂的了。先上个图
矩阵计算推荐结果

上面的图中,左边的矩阵中的每个数值表示所有用户中既看了该行对应的电影又看了该列对应的电影的用户数量,也就是Step2中我们的计算结果,右边的列向量表示用户3对每个电影的评分,等号右边就是对每个电影的推荐值。具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line=value.toString();
String[] tmpArr1=line.split(" ");
String Item=tmpArr1[0].split(":")[1];
String[] userPre2CurItems=tmpArr1[3].split(",");
for(int i=0;i<userPre2CurItems.length;i++)
{
String[] tmp=userPre2CurItems[i].split(":");
double curValue=Integer.parseInt(tmpArr1[1])*Double.parseDouble(tmp[1]);
context.write(new Text(tmp[0]+" "+Item), new DoubleWritable(curValue));
}
}

map输出的key是用户id和当前评价的电影id,输出的值其实是当前评价的电影的推荐值的一部分。for循环是针对每一个用户计算,并不是针对每一个电影计算。

1
2
3
4
5
6
7
8
9
10
11
12
public void reduce(Text key,Iterable<DoubleWritable> values,Context context)
throws IOException,InterruptedException
{
String tmpStr=new String(key.getBytes());
String[] tmp=tmpStr.split(" ");
double result=0;
for(DoubleWritable val:values)
{
result+=val.get();
}
context.write(new Text(tmp[0]), new Text(tmp[1]+","+result));
}

reduce将用户id和电影id都相同的值累加,最终输出最终结果。

源码下载

Contents
  1. 1. 算法概述
    1. 1.1. 算法输入格式
    2. 1.2. 算法输出格式
  2. 2. 算法流程
    1. 2.1. Step1
    2. 2.2. Step2
    3. 2.3. Step3
    4. 2.4. Step4
    5. 2.5. Step5